1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Pipe adapter to connect code expecting an io.Reader
6// with code expecting an io.Writer.
7
8package io
9
10import (
11	"errors"
12	"sync"
13)
14
15// onceError is an object that will only store an error once.
16type onceError struct {
17	sync.Mutex // guards following
18	err        error
19}
20
21func (a *onceError) Store(err error) {
22	a.Lock()
23	defer a.Unlock()
24	if a.err != nil {
25		return
26	}
27	a.err = err
28}
29func (a *onceError) Load() error {
30	a.Lock()
31	defer a.Unlock()
32	return a.err
33}
34
35// ErrClosedPipe is the error used for read or write operations on a closed pipe.
36var ErrClosedPipe = errors.New("io: read/write on closed pipe")
37
38// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
39type pipe struct {
40	wrMu sync.Mutex // Serializes Write operations
41	wrCh chan []byte
42	rdCh chan int
43
44	once sync.Once // Protects closing done
45	done chan struct{}
46	rerr onceError
47	werr onceError
48}
49
50func (p *pipe) read(b []byte) (n int, err error) {
51	select {
52	case <-p.done:
53		return 0, p.readCloseError()
54	default:
55	}
56
57	select {
58	case bw := <-p.wrCh:
59		nr := copy(b, bw)
60		p.rdCh <- nr
61		return nr, nil
62	case <-p.done:
63		return 0, p.readCloseError()
64	}
65}
66
67func (p *pipe) closeRead(err error) error {
68	if err == nil {
69		err = ErrClosedPipe
70	}
71	p.rerr.Store(err)
72	p.once.Do(func() { close(p.done) })
73	return nil
74}
75
76func (p *pipe) write(b []byte) (n int, err error) {
77	select {
78	case <-p.done:
79		return 0, p.writeCloseError()
80	default:
81		p.wrMu.Lock()
82		defer p.wrMu.Unlock()
83	}
84
85	for once := true; once || len(b) > 0; once = false {
86		select {
87		case p.wrCh <- b:
88			nw := <-p.rdCh
89			b = b[nw:]
90			n += nw
91		case <-p.done:
92			return n, p.writeCloseError()
93		}
94	}
95	return n, nil
96}
97
98func (p *pipe) closeWrite(err error) error {
99	if err == nil {
100		err = EOF
101	}
102	p.werr.Store(err)
103	p.once.Do(func() { close(p.done) })
104	return nil
105}
106
107// readCloseError is considered internal to the pipe type.
108func (p *pipe) readCloseError() error {
109	rerr := p.rerr.Load()
110	if werr := p.werr.Load(); rerr == nil && werr != nil {
111		return werr
112	}
113	return ErrClosedPipe
114}
115
116// writeCloseError is considered internal to the pipe type.
117func (p *pipe) writeCloseError() error {
118	werr := p.werr.Load()
119	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
120		return rerr
121	}
122	return ErrClosedPipe
123}
124
125// A PipeReader is the read half of a pipe.
126type PipeReader struct{ pipe }
127
128// Read implements the standard Read interface:
129// it reads data from the pipe, blocking until a writer
130// arrives or the write end is closed.
131// If the write end is closed with an error, that error is
132// returned as err; otherwise err is EOF.
133func (r *PipeReader) Read(data []byte) (n int, err error) {
134	return r.pipe.read(data)
135}
136
137// Close closes the reader; subsequent writes to the
138// write half of the pipe will return the error [ErrClosedPipe].
139func (r *PipeReader) Close() error {
140	return r.CloseWithError(nil)
141}
142
143// CloseWithError closes the reader; subsequent writes
144// to the write half of the pipe will return the error err.
145//
146// CloseWithError never overwrites the previous error if it exists
147// and always returns nil.
148func (r *PipeReader) CloseWithError(err error) error {
149	return r.pipe.closeRead(err)
150}
151
152// A PipeWriter is the write half of a pipe.
153type PipeWriter struct{ r PipeReader }
154
155// Write implements the standard Write interface:
156// it writes data to the pipe, blocking until one or more readers
157// have consumed all the data or the read end is closed.
158// If the read end is closed with an error, that err is
159// returned as err; otherwise err is [ErrClosedPipe].
160func (w *PipeWriter) Write(data []byte) (n int, err error) {
161	return w.r.pipe.write(data)
162}
163
164// Close closes the writer; subsequent reads from the
165// read half of the pipe will return no bytes and EOF.
166func (w *PipeWriter) Close() error {
167	return w.CloseWithError(nil)
168}
169
170// CloseWithError closes the writer; subsequent reads from the
171// read half of the pipe will return no bytes and the error err,
172// or EOF if err is nil.
173//
174// CloseWithError never overwrites the previous error if it exists
175// and always returns nil.
176func (w *PipeWriter) CloseWithError(err error) error {
177	return w.r.pipe.closeWrite(err)
178}
179
180// Pipe creates a synchronous in-memory pipe.
181// It can be used to connect code expecting an [io.Reader]
182// with code expecting an [io.Writer].
183//
184// Reads and Writes on the pipe are matched one to one
185// except when multiple Reads are needed to consume a single Write.
186// That is, each Write to the [PipeWriter] blocks until it has satisfied
187// one or more Reads from the [PipeReader] that fully consume
188// the written data.
189// The data is copied directly from the Write to the corresponding
190// Read (or Reads); there is no internal buffering.
191//
192// It is safe to call Read and Write in parallel with each other or with Close.
193// Parallel calls to Read and parallel calls to Write are also safe:
194// the individual calls will be gated sequentially.
195func Pipe() (*PipeReader, *PipeWriter) {
196	pw := &PipeWriter{r: PipeReader{pipe: pipe{
197		wrCh: make(chan []byte),
198		rdCh: make(chan int),
199		done: make(chan struct{}),
200	}}}
201	return &pw.r, pw
202}
203