1 /** 2 * A collection of helper functions for working with streams. 3 */ 4 module streams.functions; 5 6 import streams.primitives; 7 import streams.utils; 8 9 /** 10 * Transfers elements from an input stream to an output stream, doing so 11 * continuously until the input stream reads 0 elements or an error occurs. 12 * The streams are not closed after transfer completes. 13 * Params: 14 * input = The input stream to read from. 15 * output = The output stream to write to. 16 * maxElements = The maximum number of elements to transfer. Defaults to an 17 * empty optional, meaning unlimited elements. 18 * Returns: The result of the transfer operation. 19 */ 20 StreamResult transferTo(I, O, E = StreamType!I, uint BufferSize = 4096)( 21 ref I input, 22 ref O output, 23 Optional!ulong maxElements = Optional!ulong.init 24 ) if (isInputStream!(I, E) && isOutputStream!(O, E)) { 25 E[BufferSize] buffer; 26 uint totalItemsTransferred = 0; 27 while (maxElements.notPresent || totalItemsTransferred < maxElements.value) { 28 immutable uint elementsToRead = maxElements.present && (maxElements.value - totalItemsTransferred < BufferSize) 29 ? cast(uint) (maxElements.value - totalItemsTransferred) 30 : BufferSize; 31 StreamResult readResult = input.readFromStream(buffer[0 .. elementsToRead]); 32 if (readResult.hasError) return readResult; // Quit if reading fails. 33 if (readResult.count == 0) break; // No more elements to read. 34 35 StreamResult writeResult = output.writeToStream(buffer[0 .. readResult.count]); 36 if (writeResult.hasError) return writeResult; 37 if (writeResult.count != readResult.count) { 38 return StreamResult(StreamError("Could not transfer all bytes to output stream.", writeResult.count)); 39 } 40 41 totalItemsTransferred += writeResult.count; 42 } 43 return StreamResult(totalItemsTransferred); 44 } 45 46 unittest { 47 import streams.types.array : arrayInputStreamFor, arrayOutputStreamFor; 48 import streams.primitives; 49 50 // Check that transferring does indeed work by transferring the LICENSE file to memory. 51 char[12] expected = "Hello world!"; 52 auto sIn = arrayInputStreamFor!char(expected[]); 53 auto sOut = arrayOutputStreamFor!char(); 54 assert(transferTo!(typeof(sIn), typeof(sOut), char, 4096)(sIn, sOut) == StreamResult(12)); 55 assert(sOut.toArrayRaw() == expected); 56 57 // Check that a stream exception is thrown if transfer fails. 58 ubyte[3] data = [1, 2, 3]; 59 auto sIn2 = arrayInputStreamFor!ubyte(data); 60 auto sOut2 = ErrorOutputStream!ubyte(); 61 assert(transferTo(sIn2, sOut2).hasError); 62 63 sIn2.reset(); 64 auto sOut3 = NoOpOutputStream!ubyte(); 65 assert(transferTo(sIn2, sOut3).hasError); 66 67 // Check that if we set a maximum number of elements, that we only read that many. 68 int[10] buffer = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; 69 auto sIn4 = arrayInputStreamFor(buffer); 70 auto sOut4 = arrayOutputStreamFor!int(); 71 StreamResult result4 = transferTo(sIn4, sOut4, Optional!ulong(4)); 72 assert(result4 == StreamResult(4)); 73 assert(sOut4.toArrayRaw() == [1, 2, 3, 4]); 74 } 75 76 /** 77 * Reads all available elements from an input stream, and collects them in an 78 * allocated buffer. If calling `hasData()` on the return value of this 79 * function returns `true`, you need to free that data yourself, as it has been 80 * allocated via `malloc`. 81 * Params: 82 * stream = The stream to read from. 83 * Returns: Either the data that was read, as a malloc'd buffer that should be 84 * freed with `free(result.data.ptr)`, or a `StreamError` if something went 85 * wrong. 86 */ 87 Either!(E[], "data", StreamError, "error") readAll(S, E = StreamType!S, uint BufferSize = 4096)( 88 ref S stream 89 ) if (isSomeInputStream!S) { 90 E[BufferSize] buffer; 91 AppendableBuffer!E app = AppendableBuffer!E(BufferSize, BufferAllocationStrategy.Doubling); 92 while (true) { 93 StreamResult readResult = stream.readFromStream(buffer); 94 if (readResult.hasError) return Either!(E[], "data", StreamError, "error")(readResult.error); 95 if (readResult.count == 0) break; 96 app.appendItems(buffer[0 .. readResult.count]); 97 if (readResult.count < BufferSize) break; 98 } 99 E[] copy = app.toArrayCopy(); 100 return Either!(E[], "data", StreamError, "error")(copy); 101 } 102 103 unittest { 104 import streams.types.array; 105 import core.stdc.stdlib : free; 106 107 ubyte[12] data1 = cast(ubyte[12]) "Hello world!"; 108 auto sIn1 = arrayInputStreamFor(data1); 109 auto result = readAll(sIn1); 110 assert(result.hasData); 111 free(result.data.ptr); 112 113 const size = 10_000; 114 int[size] data2; 115 for (uint i = 0; i < size; i++) { 116 data2[i] = i > 0 ? i - data2[i - 1] : i; 117 } 118 auto sIn2 = arrayInputStreamFor(data2); 119 auto result2 = readAll(sIn2); 120 assert(result2.hasData); 121 assert(result2.data.length == size); 122 free(result2.data.ptr); 123 124 // Check that errors result in an error. 125 auto sIn3 = ErrorInputStream!bool(); 126 auto result3 = readAll(sIn3); 127 assert(result3.hasError); 128 } 129 130 /** 131 * Reads exactly one element from an input stream. 132 * Params: 133 * stream = The stream to read one element from. 134 * Returns: Either the element, or an error. 135 */ 136 Either!(T, "element", StreamError, "error") readOne(S, T = StreamType!S)(ref S stream) if (isSomeInputStream!S) { 137 T[1] buffer; 138 StreamResult result = stream.readFromStream(buffer); 139 if (result.hasError) return Either!(T, "element", StreamError, "error")(result.error); 140 if (result.count != 1) return Either!(T, "element", StreamError, "error")(StreamError( 141 "Did not read exactly 1 element.", 142 result.count 143 )); 144 return Either!(T, "element", StreamError, "error")(buffer[0]); 145 } 146 147 unittest { 148 import streams.types.array; 149 150 ubyte[12] data1 = cast(ubyte[12]) "Hello world!"; 151 auto sIn1 = arrayInputStreamFor(data1); 152 153 auto result = readOne(sIn1); 154 assert(result.hasElement); 155 assert(result.element == 'H'); 156 157 result = readOne(sIn1); 158 assert(result.hasElement); 159 assert(result.element == 'e'); 160 for (uint i = 0; i < 10; i++) { 161 result = readOne(sIn1); 162 assert(!result.hasError); 163 assert(result.element == data1[i + 2]); 164 } 165 // Check that reading after we've exhausted the stream returns an error. 166 result = readOne(sIn1); 167 assert(result.hasError); 168 } 169 170 /** 171 * Writes exactly one element to an output stream. 172 * Params: 173 * stream = The stream to write one element to. 174 * value = The value to write. 175 * Returns: An optional stream error, which is present if writing failed. 176 */ 177 OptionalStreamError writeOne(S, T = StreamType!S)(ref S stream, T value) if (isSomeOutputStream!S) { 178 T[1] buffer = [value]; 179 StreamResult result = stream.writeToStream(buffer); 180 if (result.hasError) return OptionalStreamError(result.error); 181 if (result.count != 1) return OptionalStreamError(StreamError( 182 "Did not write exactly 1 element.", 183 result.count 184 )); 185 return OptionalStreamError.init; 186 } 187 188 unittest { 189 import streams.types.array; 190 191 auto sOut1 = arrayOutputStreamFor!int(); 192 assert(writeOne(sOut1, 42).notPresent); 193 assert(sOut1.toArrayRaw() == [42]); 194 195 auto sOut2 = ErrorOutputStream!bool(); 196 assert(writeOne(sOut2, false).present); 197 }