1// Copyright 2009 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Pipe adapter to connect code expecting an io.Reader 6// with code expecting an io.Writer. 7 8package io 9 10import ( 11 "errors" 12 "sync" 13) 14 15// onceError is an object that will only store an error once. 16type onceError struct { 17 sync.Mutex // guards following 18 err error 19} 20 21func (a *onceError) Store(err error) { 22 a.Lock() 23 defer a.Unlock() 24 if a.err != nil { 25 return 26 } 27 a.err = err 28} 29func (a *onceError) Load() error { 30 a.Lock() 31 defer a.Unlock() 32 return a.err 33} 34 35// ErrClosedPipe is the error used for read or write operations on a closed pipe. 36var ErrClosedPipe = errors.New("io: read/write on closed pipe") 37 38// A pipe is the shared pipe structure underlying PipeReader and PipeWriter. 39type pipe struct { 40 wrMu sync.Mutex // Serializes Write operations 41 wrCh chan []byte 42 rdCh chan int 43 44 once sync.Once // Protects closing done 45 done chan struct{} 46 rerr onceError 47 werr onceError 48} 49 50func (p *pipe) read(b []byte) (n int, err error) { 51 select { 52 case <-p.done: 53 return 0, p.readCloseError() 54 default: 55 } 56 57 select { 58 case bw := <-p.wrCh: 59 nr := copy(b, bw) 60 p.rdCh <- nr 61 return nr, nil 62 case <-p.done: 63 return 0, p.readCloseError() 64 } 65} 66 67func (p *pipe) closeRead(err error) error { 68 if err == nil { 69 err = ErrClosedPipe 70 } 71 p.rerr.Store(err) 72 p.once.Do(func() { close(p.done) }) 73 return nil 74} 75 76func (p *pipe) write(b []byte) (n int, err error) { 77 select { 78 case <-p.done: 79 return 0, p.writeCloseError() 80 default: 81 p.wrMu.Lock() 82 defer p.wrMu.Unlock() 83 } 84 85 for once := true; once || len(b) > 0; once = false { 86 select { 87 case p.wrCh <- b: 88 nw := <-p.rdCh 89 b = b[nw:] 90 n += nw 91 case <-p.done: 92 return n, p.writeCloseError() 93 } 94 } 95 return n, nil 96} 97 98func (p *pipe) closeWrite(err error) error { 99 if err == nil { 100 err = EOF 101 } 102 p.werr.Store(err) 103 p.once.Do(func() { close(p.done) }) 104 return nil 105} 106 107// readCloseError is considered internal to the pipe type. 108func (p *pipe) readCloseError() error { 109 rerr := p.rerr.Load() 110 if werr := p.werr.Load(); rerr == nil && werr != nil { 111 return werr 112 } 113 return ErrClosedPipe 114} 115 116// writeCloseError is considered internal to the pipe type. 117func (p *pipe) writeCloseError() error { 118 werr := p.werr.Load() 119 if rerr := p.rerr.Load(); werr == nil && rerr != nil { 120 return rerr 121 } 122 return ErrClosedPipe 123} 124 125// A PipeReader is the read half of a pipe. 126type PipeReader struct{ pipe } 127 128// Read implements the standard Read interface: 129// it reads data from the pipe, blocking until a writer 130// arrives or the write end is closed. 131// If the write end is closed with an error, that error is 132// returned as err; otherwise err is EOF. 133func (r *PipeReader) Read(data []byte) (n int, err error) { 134 return r.pipe.read(data) 135} 136 137// Close closes the reader; subsequent writes to the 138// write half of the pipe will return the error [ErrClosedPipe]. 139func (r *PipeReader) Close() error { 140 return r.CloseWithError(nil) 141} 142 143// CloseWithError closes the reader; subsequent writes 144// to the write half of the pipe will return the error err. 145// 146// CloseWithError never overwrites the previous error if it exists 147// and always returns nil. 148func (r *PipeReader) CloseWithError(err error) error { 149 return r.pipe.closeRead(err) 150} 151 152// A PipeWriter is the write half of a pipe. 153type PipeWriter struct{ r PipeReader } 154 155// Write implements the standard Write interface: 156// it writes data to the pipe, blocking until one or more readers 157// have consumed all the data or the read end is closed. 158// If the read end is closed with an error, that err is 159// returned as err; otherwise err is [ErrClosedPipe]. 160func (w *PipeWriter) Write(data []byte) (n int, err error) { 161 return w.r.pipe.write(data) 162} 163 164// Close closes the writer; subsequent reads from the 165// read half of the pipe will return no bytes and EOF. 166func (w *PipeWriter) Close() error { 167 return w.CloseWithError(nil) 168} 169 170// CloseWithError closes the writer; subsequent reads from the 171// read half of the pipe will return no bytes and the error err, 172// or EOF if err is nil. 173// 174// CloseWithError never overwrites the previous error if it exists 175// and always returns nil. 176func (w *PipeWriter) CloseWithError(err error) error { 177 return w.r.pipe.closeWrite(err) 178} 179 180// Pipe creates a synchronous in-memory pipe. 181// It can be used to connect code expecting an [io.Reader] 182// with code expecting an [io.Writer]. 183// 184// Reads and Writes on the pipe are matched one to one 185// except when multiple Reads are needed to consume a single Write. 186// That is, each Write to the [PipeWriter] blocks until it has satisfied 187// one or more Reads from the [PipeReader] that fully consume 188// the written data. 189// The data is copied directly from the Write to the corresponding 190// Read (or Reads); there is no internal buffering. 191// 192// It is safe to call Read and Write in parallel with each other or with Close. 193// Parallel calls to Read and parallel calls to Write are also safe: 194// the individual calls will be gated sequentially. 195func Pipe() (*PipeReader, *PipeWriter) { 196 pw := &PipeWriter{r: PipeReader{pipe: pipe{ 197 wrCh: make(chan []byte), 198 rdCh: make(chan int), 199 done: make(chan struct{}), 200 }}} 201 return &pw.r, pw 202} 203