1 /* 2 * Copyright (C) 2016 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package okio 17 18 import java.util.concurrent.locks.Condition 19 import java.util.concurrent.locks.ReentrantLock 20 import kotlin.concurrent.withLock 21 22 /** 23 * A source and a sink that are attached. The sink's output is the source's input. Typically each 24 * is accessed by its own thread: a producer thread writes data to the sink and a consumer thread 25 * reads data from the source. 26 * 27 * This class uses a buffer to decouple source and sink. This buffer has a user-specified maximum 28 * size. When a producer thread outruns its consumer the buffer fills up and eventually writes to 29 * the sink will block until the consumer has caught up. Symmetrically, if a consumer outruns its 30 * producer reads block until there is data to be read. Limits on the amount of time spent waiting 31 * for the other party can be configured with [timeouts][Timeout] on the source and the 32 * sink. 33 * 34 * When the sink is closed, source reads will continue to complete normally until the buffer has 35 * been exhausted. At that point reads will return -1, indicating the end of the stream. But if the 36 * source is closed first, writes to the sink will immediately fail with an [IOException]. 37 * 38 * A pipe may be canceled to immediately fail writes to the sink and reads from the source. 39 */ 40 class Pipe(internal val maxBufferSize: Long) { 41 internal val buffer = Buffer() 42 internal var canceled = false 43 internal var sinkClosed = false 44 internal var sourceClosed = false 45 internal var foldedSink: Sink? = null 46 47 val lock: ReentrantLock = ReentrantLock() 48 val condition: Condition = lock.newCondition() 49 50 init { <lambda>null51 require(maxBufferSize >= 1L) { "maxBufferSize < 1: $maxBufferSize" } 52 } 53 54 @get:JvmName("sink") 55 val sink = object : Sink { 56 private val timeout = Timeout() 57 writenull58 override fun write(source: Buffer, byteCount: Long) { 59 var byteCount = byteCount 60 var delegate: Sink? = null 61 lock.withLock { 62 check(!sinkClosed) { "closed" } 63 if (canceled) throw IOException("canceled") 64 65 while (byteCount > 0) { 66 foldedSink?.let { 67 delegate = it 68 return@withLock 69 } 70 71 if (sourceClosed) throw IOException("source is closed") 72 73 val bufferSpaceAvailable = maxBufferSize - buffer.size 74 if (bufferSpaceAvailable == 0L) { 75 timeout.awaitSignal(condition) // Wait until the source drains the buffer. 76 if (canceled) throw IOException("canceled") 77 continue 78 } 79 80 val bytesToWrite = minOf(bufferSpaceAvailable, byteCount) 81 buffer.write(source, bytesToWrite) 82 byteCount -= bytesToWrite 83 condition.signalAll() // Notify the source that it can resume reading. 84 } 85 } 86 87 delegate?.forward { write(source, byteCount) } 88 } 89 flushnull90 override fun flush() { 91 var delegate: Sink? = null 92 lock.withLock { 93 check(!sinkClosed) { "closed" } 94 if (canceled) throw IOException("canceled") 95 96 foldedSink?.let { 97 delegate = it 98 return@withLock 99 } 100 101 if (sourceClosed && buffer.size > 0L) { 102 throw IOException("source is closed") 103 } 104 } 105 106 delegate?.forward { flush() } 107 } 108 closenull109 override fun close() { 110 var delegate: Sink? = null 111 lock.withLock { 112 if (sinkClosed) return 113 114 foldedSink?.let { 115 delegate = it 116 return@withLock 117 } 118 119 if (sourceClosed && buffer.size > 0L) throw IOException("source is closed") 120 sinkClosed = true 121 condition.signalAll() // Notify the source that no more bytes are coming. 122 } 123 124 delegate?.forward { close() } 125 } 126 timeoutnull127 override fun timeout(): Timeout = timeout 128 } 129 130 @get:JvmName("source") 131 val source = object : Source { 132 private val timeout = Timeout() 133 134 override fun read(sink: Buffer, byteCount: Long): Long { 135 lock.withLock { 136 check(!sourceClosed) { "closed" } 137 if (canceled) throw IOException("canceled") 138 139 while (buffer.size == 0L) { 140 if (sinkClosed) return -1L 141 timeout.awaitSignal(condition) // Wait until the sink fills the buffer. 142 if (canceled) throw IOException("canceled") 143 } 144 145 val result = buffer.read(sink, byteCount) 146 condition.signalAll() // Notify the sink that it can resume writing. 147 return result 148 } 149 } 150 151 override fun close() { 152 lock.withLock { 153 sourceClosed = true 154 condition.signalAll() // Notify the sink that no more bytes are desired. 155 } 156 } 157 158 override fun timeout(): Timeout = timeout 159 } 160 161 /** 162 * Writes any buffered contents of this pipe to `sink`, then replace this pipe's source with 163 * `sink`. This pipe's source is closed and attempts to read it will throw an 164 * [IllegalStateException]. 165 * 166 * This method must not be called while concurrently accessing this pipe's source. It is safe, 167 * however, to call this while concurrently writing this pipe's sink. 168 */ 169 @Throws(IOException::class) foldnull170 fun fold(sink: Sink) { 171 while (true) { 172 // Either the buffer is empty and we can swap and return. Or the buffer is non-empty and we 173 // must copy it to sink without holding any locks, then try it all again. 174 var closed = false 175 lateinit var sinkBuffer: Buffer 176 lock.withLock { 177 check(foldedSink == null) { "sink already folded" } 178 179 if (canceled) { 180 foldedSink = sink 181 throw IOException("canceled") 182 } 183 184 if (buffer.exhausted()) { 185 sourceClosed = true 186 foldedSink = sink 187 return@fold 188 } 189 190 closed = sinkClosed 191 sinkBuffer = Buffer() 192 sinkBuffer.write(buffer, buffer.size) 193 condition.signalAll() // Notify the sink that it can resume writing. 194 } 195 196 var success = false 197 try { 198 sink.write(sinkBuffer, sinkBuffer.size) 199 if (closed) { 200 sink.close() 201 } else { 202 sink.flush() 203 } 204 success = true 205 } finally { 206 if (!success) { 207 lock.withLock { 208 sourceClosed = true 209 condition.signalAll() // Notify the sink that it can resume writing. 210 } 211 } 212 } 213 } 214 } 215 forwardnull216 private inline fun Sink.forward(block: Sink.() -> Unit) { 217 this.timeout().intersectWith(this@Pipe.sink.timeout()) { this.block() } 218 } 219 220 @JvmName("-deprecated_sink") 221 @Deprecated( 222 message = "moved to val", 223 replaceWith = ReplaceWith(expression = "sink"), 224 level = DeprecationLevel.ERROR, 225 ) sinknull226 fun sink() = sink 227 228 @JvmName("-deprecated_source") 229 @Deprecated( 230 message = "moved to val", 231 replaceWith = ReplaceWith(expression = "source"), 232 level = DeprecationLevel.ERROR, 233 ) 234 fun source() = source 235 236 /** 237 * Fail any in-flight and future operations. After canceling: 238 * 239 * * Any attempt to write or flush [sink] will fail immediately with an [IOException]. 240 * * Any attempt to read [source] will fail immediately with an [IOException]. 241 * * Any attempt to [fold] will fail immediately with an [IOException]. 242 * 243 * Closing the source and the sink will complete normally even after a pipe has been canceled. If 244 * this sink has been folded, closing it will close the folded sink. This operation may block. 245 * 246 * This operation may be called by any thread at any time. It is safe to call concurrently while 247 * operating on the source or the sink. 248 */ 249 fun cancel() { 250 lock.withLock { 251 canceled = true 252 buffer.clear() 253 condition.signalAll() // Notify the source and sink that they're canceled. 254 } 255 } 256 } 257