1 module streams.types.chunked; 2 3 import streams.primitives; 4 5 version (Have_slf4d) {import slf4d;} 6 7 import std.stdio; 8 9 /** 10 * An input stream for reading from a chunked-encoded stream of bytes. 11 */ 12 struct ChunkedEncodingInputStream(S) if (isByteInputStream!S) { 13 private S stream; 14 private uint currentChunkSize = 0; 15 private uint currentChunkIndex = 0; 16 private bool endOfStream = false; 17 18 this(S stream) { 19 this.stream = stream; 20 } 21 22 /** 23 * Reads from a chunked-encoded input stream in a way that respects chunk 24 * boundaries. 25 * Params: 26 * buffer = The buffer to read bytes into. 27 * Returns: The number of bytes that were read, or -1 in case of error. 28 */ 29 StreamResult readFromStream(ubyte[] buffer) { 30 if (this.endOfStream) return StreamResult(0); 31 32 uint bytesRead = 0; 33 uint bufferIndex = 0; 34 35 while (bytesRead < buffer.length) { 36 if (this.currentChunkSize == 0 || this.currentChunkIndex == this.currentChunkSize) { 37 import streams.utils : Optional, readHexString; 38 // Try to read the next chunk header. 39 version (Have_slf4d) { 40 traceF!"Reading chunked-encoding header from stream: %s"(typeof(this.stream).stringof); 41 } 42 char[32] hexChars; 43 uint charIdx = 0; 44 // Keep reading until we reach the first \r\n. 45 while (!(charIdx >= 2 && hexChars[charIdx - 2] == '\r' && hexChars[charIdx - 1] == '\n')) { 46 ubyte[1] charBuffer; 47 StreamResult result = this.stream.readFromStream(charBuffer); 48 if (result.hasError) return result; 49 hexChars[charIdx++] = cast(char) charBuffer[0]; 50 } 51 Optional!uint chunkSize = readHexString(hexChars[0 .. charIdx - 2]); 52 if (!chunkSize.present) return StreamResult(StreamError("Invalid or missing chunk header size.", -1)); 53 if (chunkSize.value == 0) { 54 this.endOfStream = true; 55 return StreamResult(bytesRead); 56 } 57 this.currentChunkSize = chunkSize.value; 58 this.currentChunkIndex = 0; 59 version (Have_slf4d) { 60 traceF!"Read chunked-encoding header \"%s\" size of %d bytes."( 61 hexChars[0 .. charIdx - 2], 62 chunkSize.value 63 ); 64 } 65 } 66 const uint bytesAvailable = this.currentChunkSize - this.currentChunkIndex; 67 const uint spaceAvailable = cast(uint) buffer.length - bufferIndex; 68 uint bytesToRead = bytesAvailable < spaceAvailable ? bytesAvailable : spaceAvailable; 69 ubyte[] writableSlice = buffer[bufferIndex .. bufferIndex + bytesToRead]; 70 StreamResult result = this.stream.readFromStream(writableSlice); 71 if (result.hasError) return result; 72 if (result.count != bytesToRead) return StreamResult(StreamError( 73 "Could not read all bytes.", result.count 74 )); 75 bytesRead += bytesToRead; 76 bufferIndex += bytesToRead; 77 this.currentChunkIndex += bytesToRead; 78 79 if (this.currentChunkIndex == this.currentChunkSize) { 80 // Read the trailing \r\n after the chunk is done. 81 version (Have_slf4d) { 82 trace("Reading chunked-encoding trailing carriage return and line feed."); 83 } 84 ubyte[2] trail; 85 StreamResult trailingResult = this.stream.readFromStream(trail); 86 if (trailingResult.hasError) return trailingResult; 87 if (trailingResult.count != 2 || trail[0] != '\r' || trail[1] != '\n') { 88 return StreamResult(StreamError("Invalid chunk trailing.", trailingResult.count)); 89 } 90 } 91 } 92 93 return StreamResult(bytesRead); 94 } 95 96 static if (isClosableStream!S) { 97 OptionalStreamError closeStream() { 98 return this.stream.closeStream(); 99 } 100 } 101 } 102 103 unittest { 104 import streams.types.array; 105 106 ubyte[] sample1 = cast(ubyte[]) "4\r\nWiki\r\n7\r\npedia i\r\nB\r\nn \r\nchunks.\r\n0\r\n"; 107 auto sIn1 = arrayInputStreamFor(sample1); 108 auto cIn1 = ChunkedEncodingInputStream!(typeof(sIn1))(sIn1); 109 ubyte[1024] buffer1; 110 StreamResult result1 = cIn1.readFromStream(buffer1); 111 assert(result1.hasCount && result1.count > 0); 112 assert(buffer1[0 .. result1.count] == "Wikipedia in \r\nchunks."); 113 114 ubyte[] sample2 = cast(ubyte[]) "3\r\nabc"; // Invalid: missing trailing \r\n 115 auto cIn2 = chunkedEncodingInputStreamFor(arrayInputStreamFor(sample2)); 116 ubyte[1024] buffer2; 117 assert(cIn2.readFromStream(buffer2).hasError); 118 } 119 120 ChunkedEncodingInputStream!S chunkedEncodingInputStreamFor(S)(S stream) if (isByteInputStream!S) { 121 return ChunkedEncodingInputStream!(S)(stream); 122 } 123 124 /** 125 * An output stream for writing to a chunked-encoded stream of bytes. 126 */ 127 struct ChunkedEncodingOutputStream(S) if (isByteOutputStream!S) { 128 private S stream; 129 130 this(S stream) { 131 this.stream = stream; 132 } 133 134 /** 135 * Writes a single chunk to the output stream. 136 * Params: 137 * buffer = The data to write. 138 * Returns: The number of bytes that were written, not including the chunk 139 * header and trailer elements. 140 */ 141 StreamResult writeToStream(ubyte[] buffer) { 142 StreamResult headerResult = this.writeChunkHeader(cast(uint) buffer.length); 143 if (headerResult.hasError) return headerResult; 144 StreamResult chunkResult = this.stream.writeToStream(buffer); 145 if (chunkResult.hasError) return chunkResult; 146 if (chunkResult.count != buffer.length) return StreamResult(StreamError( 147 "Could not write full chunk.", 148 chunkResult.count 149 )); 150 StreamResult trailerResult = this.writeChunkTrailer(); 151 if (trailerResult.hasError) return trailerResult; 152 return chunkResult; 153 } 154 155 /** 156 * Flushes the chunked-encoded stream by writing a final zero-size chunk 157 * header and footer. 158 */ 159 OptionalStreamError flushStream() { 160 StreamResult headerResult = this.writeChunkHeader(0); 161 if (headerResult.hasError) return OptionalStreamError(headerResult.error); 162 StreamResult trailerResult = this.writeChunkTrailer(); 163 if (trailerResult.hasError) return OptionalStreamError(trailerResult.error); 164 static if (isFlushableStream!S) { 165 return this.stream.flushStream(); 166 } else { 167 return OptionalStreamError.init; 168 } 169 } 170 171 /** 172 * Closes the chunked-encoded stream, which also flushes the stream, 173 * effectively writing a final zero-size chunk header and footer. Also 174 * closes the underlying stream, if possible. 175 */ 176 OptionalStreamError closeStream() { 177 OptionalStreamError flushError = this.flushStream(); 178 if (flushError.present) return flushError; 179 static if (isClosableStream!S) { 180 return this.stream.closeStream(); 181 } else { 182 return OptionalStreamError.init; 183 } 184 } 185 186 private StreamResult writeChunkHeader(uint size) { 187 import streams.utils : writeHexString; 188 189 version (Have_slf4d) { 190 traceF!"Writing chunked-encoding header for chunk of %d bytes."(size); 191 } 192 char[32] chars; 193 uint sizeStrLength = writeHexString(size, chars); 194 chars[sizeStrLength] = '\r'; 195 chars[sizeStrLength + 1] = '\n'; 196 StreamResult writeResult = this.stream.writeToStream(cast(ubyte[]) chars[0 .. sizeStrLength + 2]); 197 if (writeResult.hasError) return writeResult; 198 if (writeResult.count != sizeStrLength + 2) return StreamResult(StreamError( 199 "Could not write full chunk header.", writeResult.count 200 )); 201 return writeResult; 202 } 203 204 private StreamResult writeChunkTrailer() { 205 version (Have_slf4d) { 206 trace("Writing chunked-encoding trailing carriage return and line feed."); 207 } 208 StreamResult writeResult = this.stream.writeToStream(cast(ubyte[2]) "\r\n"); 209 if (writeResult.hasError) return writeResult; 210 if (writeResult.count != 2) return StreamResult(StreamError( 211 "Could not write full chunk trailer.", 212 writeResult.count 213 )); 214 return writeResult; 215 } 216 } 217 218 unittest { 219 import streams; 220 221 auto sOut = byteArrayOutputStream(); 222 auto chunkedOut = ChunkedEncodingOutputStream!(typeof(sOut))(sOut); 223 224 assert(isByteOutputStream!(typeof(chunkedOut))); 225 assert(isFlushableStream!(typeof(chunkedOut))); 226 assert(isClosableStream!(typeof(chunkedOut))); 227 228 // To make things easier for ourselves, we'll test chunked encoding outside 229 // of BetterC restrictions. 230 231 version (D_BetterC) {} else { 232 import std.stdio; 233 import std.path; 234 import std.file; 235 import std.string; 236 237 const filename = buildPath("source", "streams", "primitives.d"); 238 const filesize = getSize(filename); 239 240 auto chunkedBuffer = byteArrayOutputStream(); 241 auto fIn = FileInputStream(toStringz(filename)); 242 auto sOut2 = ChunkedEncodingOutputStream!(typeof(&chunkedBuffer))(&chunkedBuffer); 243 StreamResult result = transferTo(fIn, sOut2); 244 assert(!result.hasError); 245 sOut2.closeStream(); 246 ubyte[] chunkedFileContents = chunkedBuffer.toArray(); 247 assert(chunkedFileContents.length > filesize); 248 249 auto chunkedIn = chunkedEncodingInputStreamFor(arrayInputStreamFor(chunkedFileContents)); 250 auto result2 = readAll(chunkedIn); 251 assert(!result2.hasError, "Reading from chunked input stream failed: " ~ result2.error.message); 252 assert(result2.data.length == filesize); 253 } 254 }