xref: /aosp_15_r20/external/okio/okio/src/jvmMain/kotlin/okio/Pipe.kt (revision f9742813c14b702d71392179818a9e591da8620c)
1 /*
2  * Copyright (C) 2016 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package okio
17 
18 import java.util.concurrent.locks.Condition
19 import java.util.concurrent.locks.ReentrantLock
20 import kotlin.concurrent.withLock
21 
22 /**
23  * A source and a sink that are attached. The sink's output is the source's input. Typically each
24  * is accessed by its own thread: a producer thread writes data to the sink and a consumer thread
25  * reads data from the source.
26  *
27  * This class uses a buffer to decouple source and sink. This buffer has a user-specified maximum
28  * size. When a producer thread outruns its consumer the buffer fills up and eventually writes to
29  * the sink will block until the consumer has caught up. Symmetrically, if a consumer outruns its
30  * producer reads block until there is data to be read. Limits on the amount of time spent waiting
31  * for the other party can be configured with [timeouts][Timeout] on the source and the
32  * sink.
33  *
34  * When the sink is closed, source reads will continue to complete normally until the buffer has
35  * been exhausted. At that point reads will return -1, indicating the end of the stream. But if the
36  * source is closed first, writes to the sink will immediately fail with an [IOException].
37  *
38  * A pipe may be canceled to immediately fail writes to the sink and reads from the source.
39  */
40 class Pipe(internal val maxBufferSize: Long) {
41   internal val buffer = Buffer()
42   internal var canceled = false
43   internal var sinkClosed = false
44   internal var sourceClosed = false
45   internal var foldedSink: Sink? = null
46 
47   val lock: ReentrantLock = ReentrantLock()
48   val condition: Condition = lock.newCondition()
49 
50   init {
<lambda>null51     require(maxBufferSize >= 1L) { "maxBufferSize < 1: $maxBufferSize" }
52   }
53 
54   @get:JvmName("sink")
55   val sink = object : Sink {
56     private val timeout = Timeout()
57 
writenull58     override fun write(source: Buffer, byteCount: Long) {
59       var byteCount = byteCount
60       var delegate: Sink? = null
61       lock.withLock {
62         check(!sinkClosed) { "closed" }
63         if (canceled) throw IOException("canceled")
64 
65         while (byteCount > 0) {
66           foldedSink?.let {
67             delegate = it
68             return@withLock
69           }
70 
71           if (sourceClosed) throw IOException("source is closed")
72 
73           val bufferSpaceAvailable = maxBufferSize - buffer.size
74           if (bufferSpaceAvailable == 0L) {
75             timeout.awaitSignal(condition) // Wait until the source drains the buffer.
76             if (canceled) throw IOException("canceled")
77             continue
78           }
79 
80           val bytesToWrite = minOf(bufferSpaceAvailable, byteCount)
81           buffer.write(source, bytesToWrite)
82           byteCount -= bytesToWrite
83           condition.signalAll() // Notify the source that it can resume reading.
84         }
85       }
86 
87       delegate?.forward { write(source, byteCount) }
88     }
89 
flushnull90     override fun flush() {
91       var delegate: Sink? = null
92       lock.withLock {
93         check(!sinkClosed) { "closed" }
94         if (canceled) throw IOException("canceled")
95 
96         foldedSink?.let {
97           delegate = it
98           return@withLock
99         }
100 
101         if (sourceClosed && buffer.size > 0L) {
102           throw IOException("source is closed")
103         }
104       }
105 
106       delegate?.forward { flush() }
107     }
108 
closenull109     override fun close() {
110       var delegate: Sink? = null
111       lock.withLock {
112         if (sinkClosed) return
113 
114         foldedSink?.let {
115           delegate = it
116           return@withLock
117         }
118 
119         if (sourceClosed && buffer.size > 0L) throw IOException("source is closed")
120         sinkClosed = true
121         condition.signalAll() // Notify the source that no more bytes are coming.
122       }
123 
124       delegate?.forward { close() }
125     }
126 
timeoutnull127     override fun timeout(): Timeout = timeout
128   }
129 
130   @get:JvmName("source")
131   val source = object : Source {
132     private val timeout = Timeout()
133 
134     override fun read(sink: Buffer, byteCount: Long): Long {
135       lock.withLock {
136         check(!sourceClosed) { "closed" }
137         if (canceled) throw IOException("canceled")
138 
139         while (buffer.size == 0L) {
140           if (sinkClosed) return -1L
141           timeout.awaitSignal(condition) // Wait until the sink fills the buffer.
142           if (canceled) throw IOException("canceled")
143         }
144 
145         val result = buffer.read(sink, byteCount)
146         condition.signalAll() // Notify the sink that it can resume writing.
147         return result
148       }
149     }
150 
151     override fun close() {
152       lock.withLock {
153         sourceClosed = true
154         condition.signalAll() // Notify the sink that no more bytes are desired.
155       }
156     }
157 
158     override fun timeout(): Timeout = timeout
159   }
160 
161   /**
162    * Writes any buffered contents of this pipe to `sink`, then replace this pipe's source with
163    * `sink`. This pipe's source is closed and attempts to read it will throw an
164    * [IllegalStateException].
165    *
166    * This method must not be called while concurrently accessing this pipe's source. It is safe,
167    * however, to call this while concurrently writing this pipe's sink.
168    */
169   @Throws(IOException::class)
foldnull170   fun fold(sink: Sink) {
171     while (true) {
172       // Either the buffer is empty and we can swap and return. Or the buffer is non-empty and we
173       // must copy it to sink without holding any locks, then try it all again.
174       var closed = false
175       lateinit var sinkBuffer: Buffer
176       lock.withLock {
177         check(foldedSink == null) { "sink already folded" }
178 
179         if (canceled) {
180           foldedSink = sink
181           throw IOException("canceled")
182         }
183 
184         if (buffer.exhausted()) {
185           sourceClosed = true
186           foldedSink = sink
187           return@fold
188         }
189 
190         closed = sinkClosed
191         sinkBuffer = Buffer()
192         sinkBuffer.write(buffer, buffer.size)
193         condition.signalAll() // Notify the sink that it can resume writing.
194       }
195 
196       var success = false
197       try {
198         sink.write(sinkBuffer, sinkBuffer.size)
199         if (closed) {
200           sink.close()
201         } else {
202           sink.flush()
203         }
204         success = true
205       } finally {
206         if (!success) {
207           lock.withLock {
208             sourceClosed = true
209             condition.signalAll() // Notify the sink that it can resume writing.
210           }
211         }
212       }
213     }
214   }
215 
forwardnull216   private inline fun Sink.forward(block: Sink.() -> Unit) {
217     this.timeout().intersectWith(this@Pipe.sink.timeout()) { this.block() }
218   }
219 
220   @JvmName("-deprecated_sink")
221   @Deprecated(
222     message = "moved to val",
223     replaceWith = ReplaceWith(expression = "sink"),
224     level = DeprecationLevel.ERROR,
225   )
sinknull226   fun sink() = sink
227 
228   @JvmName("-deprecated_source")
229   @Deprecated(
230     message = "moved to val",
231     replaceWith = ReplaceWith(expression = "source"),
232     level = DeprecationLevel.ERROR,
233   )
234   fun source() = source
235 
236   /**
237    * Fail any in-flight and future operations. After canceling:
238    *
239    *  * Any attempt to write or flush [sink] will fail immediately with an [IOException].
240    *  * Any attempt to read [source] will fail immediately with an [IOException].
241    *  * Any attempt to [fold] will fail immediately with an [IOException].
242    *
243    * Closing the source and the sink will complete normally even after a pipe has been canceled. If
244    * this sink has been folded, closing it will close the folded sink. This operation may block.
245    *
246    * This operation may be called by any thread at any time. It is safe to call concurrently while
247    * operating on the source or the sink.
248    */
249   fun cancel() {
250     lock.withLock {
251       canceled = true
252       buffer.clear()
253       condition.signalAll() // Notify the source and sink that they're canceled.
254     }
255   }
256 }
257