1 /**
2  * Defines buffered input and output streams that wrap around any other stream,
3  * to allow it to buffer contents and flush only when full (or when a manual
4  * `flush()` is called).
5  */
6 module streams.types.buffered;
7 
8 import streams.primitives;
9 
10 version (Have_slf4d) {import slf4d;}
11 
12 /** 
13  * The default size for buffered input and output streams.
14  */
15 const uint DEFAULT_BUFFER_SIZE = 4096;
16 
17 /** 
18  * A buffered wrapper around another input stream, that buffers data that's
19  * been read into an internal buffer, so that calls to `readToStream` don't all
20  * necessitate reading from the underlying resource.
21  */
22 struct BufferedInputStream(S, uint BufferSize = DEFAULT_BUFFER_SIZE) if (isSomeInputStream!S) {
23     private alias E = StreamType!S;
24 
25     private S stream;
26     private E[BufferSize] internalBuffer;
27     private uint nextIndex = BufferSize;
28     private uint elementsInBuffer = 0;
29     private bool streamEnded = false;
30 
31     /** 
32      * Constructs a buffered input stream to buffer reads from the given stream.
33      * Params:
34      *   stream = The stream to read from.
35      */
36     this(S stream) {
37         this.stream = stream;
38     }
39 
40     /** 
41      * Reads elements into the given buffer, first pulling from this buffered
42      * stream's internal buffer, and then reading from the underlying stream.
43      * Params:
44      *   buffer = The buffer to read items to.
45      * Returns: The number of elements read, or -1 in case of error.
46      */
47     StreamResult readFromStream(E[] buffer) {
48         int elementsRead = 0;
49         while (elementsRead < buffer.length) {
50             // First copy as much as we can from our internal buffer to the outbuffer.
51             E[] writableSlice = buffer[elementsRead .. $];
52             uint elementsFromInternal = this.readFromInternalBuffer(writableSlice);
53             elementsRead += elementsFromInternal;
54             version (Have_slf4d) {
55                 traceF!"Read %d elements from internal buffer."(elementsFromInternal);
56             }
57 
58             // Then, if necessary, refresh the internal buffer.
59             if (elementsRead < buffer.length && !this.streamEnded) {
60                 version (Have_slf4d) {
61                     traceF!"Refreshing internal buffer to read at most %d more elements."(buffer.length - elementsRead);
62                 }
63                 StreamResult readResult = this.refreshInternalBuffer();
64                 if (readResult.hasError) return readResult;
65             } else if (this.streamEnded) {
66                 break; // Quit reading if the stream has ended.
67             }
68         }
69         return StreamResult(elementsRead);
70     }
71 
72     /** 
73      * Reads as many elements as possible from the internal buffer, writing to
74      * the given buffer parameter.
75      * Params:
76      *   buffer = The buffer to write to.
77      * Returns: The number of elements that were read.
78      */
79     private uint readFromInternalBuffer(E[] buffer) {
80         if (this.elementsInBuffer == 0) return 0;
81         const uint elementsAvailable = this.elementsInBuffer - this.nextIndex;
82         if (elementsAvailable == 0) return 0;
83         const uint elementsToCopy = elementsAvailable < buffer.length ? elementsAvailable : cast(uint) buffer.length;
84         buffer[0 .. elementsToCopy] = this.internalBuffer[this.nextIndex .. this.nextIndex + elementsToCopy];
85         this.nextIndex += elementsToCopy;
86         return elementsToCopy;
87     }
88 
89     /** 
90      * Refreshes the internal buffer by reading from the underlying stream.
91      * Returns: The result of the stream read operation.
92      */
93     private StreamResult refreshInternalBuffer() {
94         if (this.streamEnded) return StreamResult(0);
95         StreamResult result = this.stream.readFromStream(this.internalBuffer);
96         if (result.hasError) return result; // Exit right away in case of error.
97         version (Have_slf4d) {
98             traceF!"Refreshed internal buffer with %d more elements."(result.count);
99         }
100         this.nextIndex = 0;
101         if (result.count == 0) {
102             this.streamEnded = true;
103             version (Have_slf4d) {
104                 trace("Internal stream has ended.");
105             }
106         }
107         this.elementsInBuffer = result.count;
108         return result;
109     }
110 
111     static if (isClosableStream!S) {
112         OptionalStreamError closeStream() {
113             return this.stream.closeStream();
114         }
115     }
116 }
117 
118 /** 
119  * Creates and returns a buffered input stream that's wrapped around the given
120  * input stream.
121  * Params:
122  *   stream = The stream to wrap in a buffered input stream.
123  * Returns: The buffered input stream.
124  */
125 BufferedInputStream!(S, BufferSize) bufferedInputStreamFor(uint BufferSize = DEFAULT_BUFFER_SIZE, S)(
126     S stream
127 ) if (isSomeInputStream!S) {
128     return BufferedInputStream!(S, BufferSize)(stream);
129 }
130 
131 unittest {
132     import streams.types.array : arrayInputStreamFor;
133 
134     // Test basic operations.
135     int[4] sInData = [1, 2, 3, 4];
136     auto sIn1 = arrayInputStreamFor!int(sInData);
137     auto bufIn1 = bufferedInputStreamFor(&sIn1);
138     int[1] buf1;
139     StreamResult readResult1 = bufIn1.readFromStream(buf1);
140     assert(readResult1 == StreamResult(1));
141     assert(buf1 == [1]);
142     int[4] buf2;
143     StreamResult readResult2 = bufIn1.readFromStream(buf2);
144     assert(readResult2 == StreamResult(3));
145 
146     // Check that a read error propagates.
147     import streams.primitives : ErrorInputStream;
148     auto sIn3 = ErrorInputStream!int();
149     auto bufIn3 = BufferedInputStream!(typeof(sIn3))(sIn3);
150     int[64] buf3;
151     assert(bufIn3.readFromStream(buf3).hasError);
152 
153     // Check that a closed input stream results in reads of 0.
154     import streams.primitives : NoOpInputStream;
155     auto sIn4 = NoOpInputStream!bool();
156     auto bufIn4 = BufferedInputStream!(typeof(sIn4))(sIn4);
157     bool[3] buf4;
158     assert(bufIn4.readFromStream(buf4) == StreamResult(0));
159 }
160 
161 /** 
162  * A buffered wrapper around another output stream, that buffers writes up to
163  * `BufferSize` elements before flushing the buffer to the underlying stream.
164  */
165 struct BufferedOutputStream(S, uint BufferSize = DEFAULT_BUFFER_SIZE) if (isSomeOutputStream!S) {
166     private alias E = StreamType!S;
167 
168     private S stream;
169     private E[BufferSize] internalBuffer;
170     private uint nextIndex = 0;
171 
172     /** 
173      * Constructs a buffered output stream to buffer writes to the given stream.
174      * Params:
175      *   stream = The stream to write to.
176      */
177     this(S stream) {
178         this.stream = stream;
179     }
180 
181     /** 
182      * Writes the given items this stream's internal buffer, and flushes if we
183      * reach the buffer's capacity.
184      * Params:
185      *   buffer = The elements to write.
186      * Returns: The number of elements that were written, or -1 in case of error.
187      */
188     StreamResult writeToStream(E[] buffer) {
189         int elementsWritten = 0;
190         uint bufferIndex = 0;
191         while (bufferIndex < buffer.length) {
192             // Determine how many elements we can copy to our buffer at once.
193             const uint remainingElements = cast(uint) buffer.length - bufferIndex;
194             const uint remainingCapacity = BufferSize - nextIndex;
195             const uint elementsToWrite = remainingElements > remainingCapacity ? remainingCapacity : remainingElements;
196             version (Have_slf4d) {
197                 traceF!"Writing %d elements to the internal buffer."(elementsToWrite);
198             }
199 
200             // Do the copy operation.
201             const newInternalBufferIndex = this.nextIndex + elementsToWrite;
202             const newBufferIndex = bufferIndex + elementsToWrite;
203             this.internalBuffer[this.nextIndex .. newInternalBufferIndex] = buffer[bufferIndex .. newBufferIndex];
204 
205             // Update our state, and flush if we've filled up our buffer.
206             this.nextIndex = newInternalBufferIndex;
207             bufferIndex = newBufferIndex;
208             elementsWritten += elementsToWrite;
209 
210             if (this.nextIndex == BufferSize) {
211                 OptionalStreamError optionalError = this.internalFlush();
212                 if (optionalError.present) return StreamResult(optionalError.value); // If we detect an error, quit immediately.
213             }
214         }
215         return StreamResult(elementsWritten);
216     }
217 
218     private OptionalStreamError internalFlush() {
219         StreamResult result = this.stream.writeToStream(this.internalBuffer[0 .. this.nextIndex]);
220         if (result.hasError) return OptionalStreamError(result.error);
221         version (Have_slf4d) {
222             traceF!"Flushed %d elements from the internal buffer to the stream."(result.count);
223         }
224         this.nextIndex = 0;
225         return OptionalStreamError.init;
226     }
227 
228     /** 
229      * Manually invokes a flush to the underlying stream.
230      */
231     OptionalStreamError flushStream() {
232         return this.internalFlush();
233     }
234 
235     static if (isClosableStream!S) {
236         OptionalStreamError closeStream() {
237             OptionalStreamError flushError = this.flushStream();
238             OptionalStreamError closeError = this.stream.closeStream();
239             if (closeError.present) return closeError;
240             if (flushError.present) return flushError;
241             return OptionalStreamError.init;
242         }
243     }
244 }
245 
246 /**
247  * Creates and returns a buffered output stream that's wrapped around the given
248  * output stream.
249  * Params:
250  *   stream = The stream to wrap in a buffered output stream.
251  * Returns: The buffered output stream.
252  */
253 BufferedOutputStream!(S, BufferSize) bufferedOutputStreamFor(uint BufferSize = DEFAULT_BUFFER_SIZE, S)(
254     S stream
255 ) if (isSomeOutputStream!S) {
256     return BufferedOutputStream!(S, BufferSize)(stream);
257 }
258 
259 unittest {
260     import streams.primitives : isFlushableStream;
261     import streams.types.array : byteArrayOutputStream;
262 
263     auto sOut1 = byteArrayOutputStream();
264     auto bufOut1 = bufferedOutputStreamFor!(4)(&sOut1);
265 
266     assert(isOutputStream!(typeof(bufOut1), ubyte));
267     assert(isFlushableStream!(typeof(bufOut1)));
268 
269     ubyte[5] data = [1, 2, 3, 4, 5];
270     assert(bufOut1.writeToStream(data[0 .. 1]) == StreamResult(1));
271     assert(sOut1.toArrayRaw().length == 0);
272     assert(bufOut1.writeToStream(data[1 .. 2]) == StreamResult(1));
273     assert(sOut1.toArrayRaw().length == 0);
274     assert(bufOut1.writeToStream(data[2 .. 4]) == StreamResult(2));
275     assert(sOut1.toArrayRaw() == [1, 2, 3, 4]);
276     assert(bufOut1.writeToStream(data[4 .. 5]) == StreamResult(1));
277     assert(sOut1.toArrayRaw().length == 4);
278     bufOut1.flushStream();
279     assert(sOut1.toArrayRaw() == [1, 2, 3, 4, 5]);
280 }