1 module streams.types.chunked;
2 
3 import streams.primitives;
4 
5 version (Have_slf4d) {import slf4d;}
6 
7 import std.stdio;
8 
9 /** 
10  * An input stream for reading from a chunked-encoded stream of bytes.
11  */
12 struct ChunkedEncodingInputStream(S) if (isByteInputStream!S) {
13     private S stream;
14     private uint currentChunkSize = 0;
15     private uint currentChunkIndex = 0;
16     private bool endOfStream = false;
17 
18     this(S stream) {
19         this.stream = stream;
20     }
21 
22     /** 
23      * Reads from a chunked-encoded input stream in a way that respects chunk
24      * boundaries.
25      * Params:
26      *   buffer = The buffer to read bytes into.
27      * Returns: The number of bytes that were read, or -1 in case of error.
28      */
29     StreamResult readFromStream(ubyte[] buffer) {
30         if (this.endOfStream) return StreamResult(0);
31 
32         uint bytesRead = 0;
33         uint bufferIndex = 0;
34 
35         while (bytesRead < buffer.length) {
36             if (this.currentChunkSize == 0 || this.currentChunkIndex == this.currentChunkSize) {
37                 import streams.utils : Optional, readHexString;
38                 // Try to read the next chunk header.
39                 version (Have_slf4d) {
40                     traceF!"Reading chunked-encoding header from stream: %s"(typeof(this.stream).stringof);
41                 }
42                 char[32] hexChars;
43                 uint charIdx = 0;
44                 // Keep reading until we reach the first \r\n.
45                 while (!(charIdx >= 2 && hexChars[charIdx - 2] == '\r' && hexChars[charIdx - 1] == '\n')) {
46                     ubyte[1] charBuffer;
47                     StreamResult result = this.stream.readFromStream(charBuffer);
48                     if (result.hasError) return result;
49                     hexChars[charIdx++] = cast(char) charBuffer[0];
50                 }
51                 Optional!uint chunkSize = readHexString(hexChars[0 .. charIdx - 2]);
52                 if (!chunkSize.present) return StreamResult(StreamError("Invalid or missing chunk header size.", -1));
53                 if (chunkSize.value == 0) {
54                     this.endOfStream = true;
55                     return StreamResult(bytesRead);
56                 }
57                 this.currentChunkSize = chunkSize.value;
58                 this.currentChunkIndex = 0;
59                 version (Have_slf4d) {
60                     traceF!"Read chunked-encoding header \"%s\" size of %d bytes."(
61                         hexChars[0 .. charIdx - 2],
62                         chunkSize.value
63                     );
64                 }
65             }
66             const uint bytesAvailable = this.currentChunkSize - this.currentChunkIndex;
67             const uint spaceAvailable = cast(uint) buffer.length - bufferIndex;
68             uint bytesToRead = bytesAvailable < spaceAvailable ? bytesAvailable : spaceAvailable;
69             ubyte[] writableSlice = buffer[bufferIndex .. bufferIndex + bytesToRead];
70             StreamResult result = this.stream.readFromStream(writableSlice);
71             if (result.hasError) return result;
72             if (result.count != bytesToRead) return StreamResult(StreamError(
73                 "Could not read all bytes.", result.count
74             ));
75             bytesRead += bytesToRead;
76             bufferIndex += bytesToRead;
77             this.currentChunkIndex += bytesToRead;
78 
79             if (this.currentChunkIndex == this.currentChunkSize) {
80                 // Read the trailing \r\n after the chunk is done.
81                 version (Have_slf4d) {
82                     trace("Reading chunked-encoding trailing carriage return and line feed.");
83                 }
84                 ubyte[2] trail;
85                 StreamResult trailingResult = this.stream.readFromStream(trail);
86                 if (trailingResult.hasError) return trailingResult;
87                 if (trailingResult.count != 2 || trail[0] != '\r' || trail[1] != '\n') {
88                     return StreamResult(StreamError("Invalid chunk trailing.", trailingResult.count));
89                 }
90             }
91         }
92 
93         return StreamResult(bytesRead);
94     }
95 
96     static if (isClosableStream!S) {
97         OptionalStreamError closeStream() {
98             return this.stream.closeStream();
99         }
100     }
101 }
102 
103 unittest {
104     import streams.types.array;
105 
106     ubyte[] sample1 = cast(ubyte[]) "4\r\nWiki\r\n7\r\npedia i\r\nB\r\nn \r\nchunks.\r\n0\r\n";
107     auto sIn1 = arrayInputStreamFor(sample1);
108     auto cIn1 = ChunkedEncodingInputStream!(typeof(sIn1))(sIn1);
109     ubyte[1024] buffer1;
110     StreamResult result1 = cIn1.readFromStream(buffer1);
111     assert(result1.hasCount && result1.count > 0);
112     assert(buffer1[0 .. result1.count] == "Wikipedia in \r\nchunks.");
113 
114     ubyte[] sample2 = cast(ubyte[]) "3\r\nabc"; // Invalid: missing trailing \r\n
115     auto cIn2 = chunkedEncodingInputStreamFor(arrayInputStreamFor(sample2));
116     ubyte[1024] buffer2;
117     assert(cIn2.readFromStream(buffer2).hasError);
118 }
119 
120 ChunkedEncodingInputStream!S chunkedEncodingInputStreamFor(S)(S stream) if (isByteInputStream!S) {
121     return ChunkedEncodingInputStream!(S)(stream);
122 }
123 
124 /** 
125  * An output stream for writing to a chunked-encoded stream of bytes.
126  */
127 struct ChunkedEncodingOutputStream(S) if (isByteOutputStream!S) {
128     private S stream;
129 
130     this(S stream) {
131         this.stream = stream;
132     }
133 
134     /** 
135      * Writes a single chunk to the output stream.
136      * Params:
137      *   buffer = The data to write.
138      * Returns: The number of bytes that were written, not including the chunk
139      * header and trailer elements.
140      */
141     StreamResult writeToStream(ubyte[] buffer) {
142         StreamResult headerResult = this.writeChunkHeader(cast(uint) buffer.length);
143         if (headerResult.hasError) return headerResult;
144         StreamResult chunkResult = this.stream.writeToStream(buffer);
145         if (chunkResult.hasError) return chunkResult;
146         if (chunkResult.count != buffer.length) return StreamResult(StreamError(
147             "Could not write full chunk.",
148             chunkResult.count
149         ));
150         StreamResult trailerResult = this.writeChunkTrailer();
151         if (trailerResult.hasError) return trailerResult;
152         return chunkResult;
153     }
154 
155     /** 
156      * Flushes the chunked-encoded stream by writing a final zero-size chunk
157      * header and footer.
158      */
159     OptionalStreamError flushStream() {
160         StreamResult headerResult = this.writeChunkHeader(0);
161         if (headerResult.hasError) return OptionalStreamError(headerResult.error);
162         StreamResult trailerResult = this.writeChunkTrailer();
163         if (trailerResult.hasError) return OptionalStreamError(trailerResult.error);
164         static if (isFlushableStream!S) {
165             return this.stream.flushStream();
166         } else {
167             return OptionalStreamError.init;
168         }
169     }
170 
171     /** 
172      * Closes the chunked-encoded stream, which also flushes the stream,
173      * effectively writing a final zero-size chunk header and footer. Also
174      * closes the underlying stream, if possible.
175      */
176     OptionalStreamError closeStream() {
177         OptionalStreamError flushError = this.flushStream();
178         if (flushError.present) return flushError;
179         static if (isClosableStream!S) {
180             return this.stream.closeStream();
181         } else {
182             return OptionalStreamError.init;
183         }
184     }
185 
186     private StreamResult writeChunkHeader(uint size) {
187         import streams.utils : writeHexString;
188 
189         version (Have_slf4d) {
190             traceF!"Writing chunked-encoding header for chunk of %d bytes."(size);
191         }
192         char[32] chars;
193         uint sizeStrLength = writeHexString(size, chars);
194         chars[sizeStrLength] = '\r';
195         chars[sizeStrLength + 1] = '\n';
196         StreamResult writeResult = this.stream.writeToStream(cast(ubyte[]) chars[0 .. sizeStrLength + 2]);
197         if (writeResult.hasError) return writeResult;
198         if (writeResult.count != sizeStrLength + 2) return StreamResult(StreamError(
199             "Could not write full chunk header.", writeResult.count
200         ));
201         return writeResult;
202     }
203 
204     private StreamResult writeChunkTrailer() {
205         version (Have_slf4d) {
206             trace("Writing chunked-encoding trailing carriage return and line feed.");
207         }
208         StreamResult writeResult = this.stream.writeToStream(cast(ubyte[2]) "\r\n");
209         if (writeResult.hasError) return writeResult;
210         if (writeResult.count != 2) return StreamResult(StreamError(
211             "Could not write full chunk trailer.",
212             writeResult.count
213         ));
214         return writeResult;
215     }
216 }
217 
218 unittest {
219     import streams;
220 
221     auto sOut = byteArrayOutputStream();
222     auto chunkedOut = ChunkedEncodingOutputStream!(typeof(sOut))(sOut);
223 
224     assert(isByteOutputStream!(typeof(chunkedOut)));
225     assert(isFlushableStream!(typeof(chunkedOut)));
226     assert(isClosableStream!(typeof(chunkedOut)));
227 
228     // To make things easier for ourselves, we'll test chunked encoding outside
229     // of BetterC restrictions.
230 
231     version (D_BetterC) {} else {
232         import std.stdio;
233         import std.path;
234         import std.file;
235         import std.string;
236 
237         const filename = buildPath("source", "streams", "primitives.d");
238         const filesize = getSize(filename);
239         
240         auto chunkedBuffer = byteArrayOutputStream();
241         auto fIn = FileInputStream(toStringz(filename));
242         auto sOut2 = ChunkedEncodingOutputStream!(typeof(&chunkedBuffer))(&chunkedBuffer);
243         StreamResult result = transferTo(fIn, sOut2);
244         assert(!result.hasError);
245         sOut2.closeStream();
246         ubyte[] chunkedFileContents = chunkedBuffer.toArray();
247         assert(chunkedFileContents.length > filesize);
248 
249         auto chunkedIn = chunkedEncodingInputStreamFor(arrayInputStreamFor(chunkedFileContents));
250         auto result2 = readAll(chunkedIn);
251         assert(!result2.hasError, "Reading from chunked input stream failed: " ~ result2.error.message);
252         assert(result2.data.length == filesize);
253     }
254 }