1 /** 2 * Defines buffered input and output streams that wrap around any other stream, 3 * to allow it to buffer contents and flush only when full (or when a manual 4 * `flush()` is called). 5 */ 6 module streams.types.buffered; 7 8 import streams.primitives; 9 10 version (Have_slf4d) {import slf4d;} 11 12 /** 13 * The default size for buffered input and output streams. 14 */ 15 const uint DEFAULT_BUFFER_SIZE = 4096; 16 17 /** 18 * A buffered wrapper around another input stream, that buffers data that's 19 * been read into an internal buffer, so that calls to `readToStream` don't all 20 * necessitate reading from the underlying resource. 21 */ 22 struct BufferedInputStream(S, uint BufferSize = DEFAULT_BUFFER_SIZE) if (isSomeInputStream!S) { 23 private alias E = StreamType!S; 24 25 private S stream; 26 private E[BufferSize] internalBuffer; 27 private uint nextIndex = BufferSize; 28 private uint elementsInBuffer = 0; 29 private bool streamEnded = false; 30 31 /** 32 * Constructs a buffered input stream to buffer reads from the given stream. 33 * Params: 34 * stream = The stream to read from. 35 */ 36 this(S stream) { 37 this.stream = stream; 38 } 39 40 /** 41 * Reads elements into the given buffer, first pulling from this buffered 42 * stream's internal buffer, and then reading from the underlying stream. 43 * Params: 44 * buffer = The buffer to read items to. 45 * Returns: The number of elements read, or -1 in case of error. 46 */ 47 StreamResult readFromStream(E[] buffer) { 48 int elementsRead = 0; 49 while (elementsRead < buffer.length) { 50 // First copy as much as we can from our internal buffer to the outbuffer. 51 E[] writableSlice = buffer[elementsRead .. $]; 52 uint elementsFromInternal = this.readFromInternalBuffer(writableSlice); 53 elementsRead += elementsFromInternal; 54 version (Have_slf4d) { 55 traceF!"Read %d elements from internal buffer."(elementsFromInternal); 56 } 57 58 // Then, if necessary, refresh the internal buffer. 59 if (elementsRead < buffer.length && !this.streamEnded) { 60 version (Have_slf4d) { 61 traceF!"Refreshing internal buffer to read at most %d more elements."(buffer.length - elementsRead); 62 } 63 StreamResult readResult = this.refreshInternalBuffer(); 64 if (readResult.hasError) return readResult; 65 } else if (this.streamEnded) { 66 break; // Quit reading if the stream has ended. 67 } 68 } 69 return StreamResult(elementsRead); 70 } 71 72 /** 73 * Reads as many elements as possible from the internal buffer, writing to 74 * the given buffer parameter. 75 * Params: 76 * buffer = The buffer to write to. 77 * Returns: The number of elements that were read. 78 */ 79 private uint readFromInternalBuffer(E[] buffer) { 80 if (this.elementsInBuffer == 0) return 0; 81 const uint elementsAvailable = this.elementsInBuffer - this.nextIndex; 82 if (elementsAvailable == 0) return 0; 83 const uint elementsToCopy = elementsAvailable < buffer.length ? elementsAvailable : cast(uint) buffer.length; 84 buffer[0 .. elementsToCopy] = this.internalBuffer[this.nextIndex .. this.nextIndex + elementsToCopy]; 85 this.nextIndex += elementsToCopy; 86 return elementsToCopy; 87 } 88 89 /** 90 * Refreshes the internal buffer by reading from the underlying stream. 91 * Returns: The result of the stream read operation. 92 */ 93 private StreamResult refreshInternalBuffer() { 94 if (this.streamEnded) return StreamResult(0); 95 StreamResult result = this.stream.readFromStream(this.internalBuffer); 96 if (result.hasError) return result; // Exit right away in case of error. 97 version (Have_slf4d) { 98 traceF!"Refreshed internal buffer with %d more elements."(result.count); 99 } 100 this.nextIndex = 0; 101 if (result.count == 0) { 102 this.streamEnded = true; 103 version (Have_slf4d) { 104 trace("Internal stream has ended."); 105 } 106 } 107 this.elementsInBuffer = result.count; 108 return result; 109 } 110 111 static if (isClosableStream!S) { 112 OptionalStreamError closeStream() { 113 return this.stream.closeStream(); 114 } 115 } 116 } 117 118 /** 119 * Creates and returns a buffered input stream that's wrapped around the given 120 * input stream. 121 * Params: 122 * stream = The stream to wrap in a buffered input stream. 123 * Returns: The buffered input stream. 124 */ 125 BufferedInputStream!(S, BufferSize) bufferedInputStreamFor(uint BufferSize = DEFAULT_BUFFER_SIZE, S)( 126 S stream 127 ) if (isSomeInputStream!S) { 128 return BufferedInputStream!(S, BufferSize)(stream); 129 } 130 131 unittest { 132 import streams.types.array : arrayInputStreamFor; 133 134 // Test basic operations. 135 int[4] sInData = [1, 2, 3, 4]; 136 auto sIn1 = arrayInputStreamFor!int(sInData); 137 auto bufIn1 = bufferedInputStreamFor(&sIn1); 138 int[1] buf1; 139 StreamResult readResult1 = bufIn1.readFromStream(buf1); 140 assert(readResult1 == StreamResult(1)); 141 assert(buf1 == [1]); 142 int[4] buf2; 143 StreamResult readResult2 = bufIn1.readFromStream(buf2); 144 assert(readResult2 == StreamResult(3)); 145 146 // Check that a read error propagates. 147 import streams.primitives : ErrorInputStream; 148 auto sIn3 = ErrorInputStream!int(); 149 auto bufIn3 = BufferedInputStream!(typeof(sIn3))(sIn3); 150 int[64] buf3; 151 assert(bufIn3.readFromStream(buf3).hasError); 152 153 // Check that a closed input stream results in reads of 0. 154 import streams.primitives : NoOpInputStream; 155 auto sIn4 = NoOpInputStream!bool(); 156 auto bufIn4 = BufferedInputStream!(typeof(sIn4))(sIn4); 157 bool[3] buf4; 158 assert(bufIn4.readFromStream(buf4) == StreamResult(0)); 159 } 160 161 /** 162 * A buffered wrapper around another output stream, that buffers writes up to 163 * `BufferSize` elements before flushing the buffer to the underlying stream. 164 */ 165 struct BufferedOutputStream(S, uint BufferSize = DEFAULT_BUFFER_SIZE) if (isSomeOutputStream!S) { 166 private alias E = StreamType!S; 167 168 private S stream; 169 private E[BufferSize] internalBuffer; 170 private uint nextIndex = 0; 171 172 /** 173 * Constructs a buffered output stream to buffer writes to the given stream. 174 * Params: 175 * stream = The stream to write to. 176 */ 177 this(S stream) { 178 this.stream = stream; 179 } 180 181 /** 182 * Writes the given items this stream's internal buffer, and flushes if we 183 * reach the buffer's capacity. 184 * Params: 185 * buffer = The elements to write. 186 * Returns: The number of elements that were written, or -1 in case of error. 187 */ 188 StreamResult writeToStream(E[] buffer) { 189 int elementsWritten = 0; 190 uint bufferIndex = 0; 191 while (bufferIndex < buffer.length) { 192 // Determine how many elements we can copy to our buffer at once. 193 const uint remainingElements = cast(uint) buffer.length - bufferIndex; 194 const uint remainingCapacity = BufferSize - nextIndex; 195 const uint elementsToWrite = remainingElements > remainingCapacity ? remainingCapacity : remainingElements; 196 version (Have_slf4d) { 197 traceF!"Writing %d elements to the internal buffer."(elementsToWrite); 198 } 199 200 // Do the copy operation. 201 const newInternalBufferIndex = this.nextIndex + elementsToWrite; 202 const newBufferIndex = bufferIndex + elementsToWrite; 203 this.internalBuffer[this.nextIndex .. newInternalBufferIndex] = buffer[bufferIndex .. newBufferIndex]; 204 205 // Update our state, and flush if we've filled up our buffer. 206 this.nextIndex = newInternalBufferIndex; 207 bufferIndex = newBufferIndex; 208 elementsWritten += elementsToWrite; 209 210 if (this.nextIndex == BufferSize) { 211 OptionalStreamError optionalError = this.internalFlush(); 212 if (optionalError.present) return StreamResult(optionalError.value); // If we detect an error, quit immediately. 213 } 214 } 215 return StreamResult(elementsWritten); 216 } 217 218 private OptionalStreamError internalFlush() { 219 StreamResult result = this.stream.writeToStream(this.internalBuffer[0 .. this.nextIndex]); 220 if (result.hasError) return OptionalStreamError(result.error); 221 version (Have_slf4d) { 222 traceF!"Flushed %d elements from the internal buffer to the stream."(result.count); 223 } 224 this.nextIndex = 0; 225 return OptionalStreamError.init; 226 } 227 228 /** 229 * Manually invokes a flush to the underlying stream. 230 */ 231 OptionalStreamError flushStream() { 232 return this.internalFlush(); 233 } 234 235 static if (isClosableStream!S) { 236 OptionalStreamError closeStream() { 237 OptionalStreamError flushError = this.flushStream(); 238 OptionalStreamError closeError = this.stream.closeStream(); 239 if (closeError.present) return closeError; 240 if (flushError.present) return flushError; 241 return OptionalStreamError.init; 242 } 243 } 244 } 245 246 /** 247 * Creates and returns a buffered output stream that's wrapped around the given 248 * output stream. 249 * Params: 250 * stream = The stream to wrap in a buffered output stream. 251 * Returns: The buffered output stream. 252 */ 253 BufferedOutputStream!(S, BufferSize) bufferedOutputStreamFor(uint BufferSize = DEFAULT_BUFFER_SIZE, S)( 254 S stream 255 ) if (isSomeOutputStream!S) { 256 return BufferedOutputStream!(S, BufferSize)(stream); 257 } 258 259 unittest { 260 import streams.primitives : isFlushableStream; 261 import streams.types.array : byteArrayOutputStream; 262 263 auto sOut1 = byteArrayOutputStream(); 264 auto bufOut1 = bufferedOutputStreamFor!(4)(&sOut1); 265 266 assert(isOutputStream!(typeof(bufOut1), ubyte)); 267 assert(isFlushableStream!(typeof(bufOut1))); 268 269 ubyte[5] data = [1, 2, 3, 4, 5]; 270 assert(bufOut1.writeToStream(data[0 .. 1]) == StreamResult(1)); 271 assert(sOut1.toArrayRaw().length == 0); 272 assert(bufOut1.writeToStream(data[1 .. 2]) == StreamResult(1)); 273 assert(sOut1.toArrayRaw().length == 0); 274 assert(bufOut1.writeToStream(data[2 .. 4]) == StreamResult(2)); 275 assert(sOut1.toArrayRaw() == [1, 2, 3, 4]); 276 assert(bufOut1.writeToStream(data[4 .. 5]) == StreamResult(1)); 277 assert(sOut1.toArrayRaw().length == 4); 278 bufOut1.flushStream(); 279 assert(sOut1.toArrayRaw() == [1, 2, 3, 4, 5]); 280 }