1 /** 
2  * A collection of helper functions for working with streams.
3  */
4 module streams.functions;
5 
6 import streams.primitives;
7 import streams.utils;
8 
9 /** 
10  * Transfers elements from an input stream to an output stream, doing so
11  * continuously until the input stream reads 0 elements or an error occurs.
12  * The streams are not closed after transfer completes.
13  * Params:
14  *   input = The input stream to read from.
15  *   output = The output stream to write to.
16  *   maxElements = The maximum number of elements to transfer. Defaults to an
17  *                 empty optional, meaning unlimited elements.
18  * Returns: The result of the transfer operation.
19  */
20 StreamResult transferTo(I, O, E = StreamType!I, uint BufferSize = 4096)(
21     ref I input,
22     ref O output,
23     Optional!ulong maxElements = Optional!ulong.init
24 ) if (isInputStream!(I, E) && isOutputStream!(O, E)) {
25     E[BufferSize] buffer;
26     uint totalItemsTransferred = 0;
27     while (maxElements.notPresent || totalItemsTransferred < maxElements.value) {
28         immutable uint elementsToRead = maxElements.present && (maxElements.value - totalItemsTransferred < BufferSize)
29             ? cast(uint) (maxElements.value - totalItemsTransferred)
30             : BufferSize;
31         StreamResult readResult = input.readFromStream(buffer[0 .. elementsToRead]);
32         if (readResult.hasError) return readResult; // Quit if reading fails.
33         if (readResult.count == 0) break; // No more elements to read.
34 
35         StreamResult writeResult = output.writeToStream(buffer[0 .. readResult.count]);
36         if (writeResult.hasError) return writeResult;
37         if (writeResult.count != readResult.count) {
38             return StreamResult(StreamError("Could not transfer all bytes to output stream.", writeResult.count));
39         }
40 
41         totalItemsTransferred += writeResult.count;
42     }
43     return StreamResult(totalItemsTransferred);
44 }
45 
46 unittest {
47     import streams.types.array : arrayInputStreamFor, arrayOutputStreamFor;
48     import streams.primitives;
49 
50     // Check that transferring does indeed work by transferring the LICENSE file to memory.
51     char[12] expected = "Hello world!";
52     auto sIn = arrayInputStreamFor!char(expected[]);
53     auto sOut = arrayOutputStreamFor!char();
54     assert(transferTo!(typeof(sIn), typeof(sOut), char, 4096)(sIn, sOut) == StreamResult(12));
55     assert(sOut.toArrayRaw() == expected);
56 
57     // Check that a stream exception is thrown if transfer fails.
58     ubyte[3] data = [1, 2, 3];
59     auto sIn2 = arrayInputStreamFor!ubyte(data);
60     auto sOut2 = ErrorOutputStream!ubyte();
61     assert(transferTo(sIn2, sOut2).hasError);
62 
63     sIn2.reset();
64     auto sOut3 = NoOpOutputStream!ubyte();
65     assert(transferTo(sIn2, sOut3).hasError);
66 
67     // Check that if we set a maximum number of elements, that we only read that many.
68     int[10] buffer = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
69     auto sIn4 = arrayInputStreamFor(buffer);
70     auto sOut4 = arrayOutputStreamFor!int();
71     StreamResult result4 = transferTo(sIn4, sOut4, Optional!ulong(4));
72     assert(result4 == StreamResult(4));
73     assert(sOut4.toArrayRaw() == [1, 2, 3, 4]);
74 }
75 
76 /** 
77  * Reads all available elements from an input stream, and collects them in an
78  * allocated buffer. If calling `hasData()` on the return value of this
79  * function returns `true`, you need to free that data yourself, as it has been
80  * allocated via `malloc`.
81  * Params:
82  *   stream = The stream to read from.
83  * Returns: Either the data that was read, as a malloc'd buffer that should be
84  * freed with `free(result.data.ptr)`, or a `StreamError` if something went
85  * wrong.
86  */
87 Either!(E[], "data", StreamError, "error") readAll(S, E = StreamType!S, uint BufferSize = 4096)(
88     ref S stream
89 ) if (isSomeInputStream!S) {
90     E[BufferSize] buffer;
91     AppendableBuffer!E app = AppendableBuffer!E(BufferSize, BufferAllocationStrategy.Doubling);
92     while (true) {
93         StreamResult readResult = stream.readFromStream(buffer);
94         if (readResult.hasError) return Either!(E[], "data", StreamError, "error")(readResult.error);
95         if (readResult.count == 0) break;
96         app.appendItems(buffer[0 .. readResult.count]);
97         if (readResult.count < BufferSize) break;
98     }
99     E[] copy = app.toArrayCopy();
100     return Either!(E[], "data", StreamError, "error")(copy);
101 }
102 
103 unittest {
104     import streams.types.array;
105     import core.stdc.stdlib : free;
106 
107     ubyte[12] data1 = cast(ubyte[12]) "Hello world!";
108     auto sIn1 = arrayInputStreamFor(data1);
109     auto result = readAll(sIn1);
110     assert(result.hasData);
111     free(result.data.ptr);
112 
113     const size = 10_000;
114     int[size] data2;
115     for (uint i = 0; i < size; i++) {
116         data2[i] = i > 0 ? i - data2[i - 1] : i;
117     }
118     auto sIn2 = arrayInputStreamFor(data2);
119     auto result2 = readAll(sIn2);
120     assert(result2.hasData);
121     assert(result2.data.length == size);
122     free(result2.data.ptr);
123 
124     // Check that errors result in an error.
125     auto sIn3 = ErrorInputStream!bool();
126     auto result3 = readAll(sIn3);
127     assert(result3.hasError);
128 }
129 
130 /**
131  * Reads exactly one element from an input stream.
132  * Params:
133  *   stream = The stream to read one element from.
134  * Returns: Either the element, or an error.
135  */
136 Either!(T, "element", StreamError, "error") readOne(S, T = StreamType!S)(ref S stream) if (isSomeInputStream!S) {
137     T[1] buffer;
138     StreamResult result = stream.readFromStream(buffer);
139     if (result.hasError) return Either!(T, "element", StreamError, "error")(result.error);
140     if (result.count != 1) return Either!(T, "element", StreamError, "error")(StreamError(
141         "Did not read exactly 1 element.",
142         result.count
143     ));
144     return Either!(T, "element", StreamError, "error")(buffer[0]);
145 }
146 
147 unittest {
148     import streams.types.array;
149 
150     ubyte[12] data1 = cast(ubyte[12]) "Hello world!";
151     auto sIn1 = arrayInputStreamFor(data1);
152 
153     auto result = readOne(sIn1);
154     assert(result.hasElement);
155     assert(result.element == 'H');
156     
157     result = readOne(sIn1);
158     assert(result.hasElement);
159     assert(result.element == 'e');
160     for (uint i = 0; i < 10; i++) {
161         result = readOne(sIn1);
162         assert(!result.hasError);
163         assert(result.element == data1[i + 2]);
164     }
165     // Check that reading after we've exhausted the stream returns an error.
166     result = readOne(sIn1);
167     assert(result.hasError);
168 }
169 
170 /** 
171  * Writes exactly one element to an output stream.
172  * Params:
173  *   stream = The stream to write one element to.
174  *   value = The value to write.
175  * Returns: An optional stream error, which is present if writing failed.
176  */
177 OptionalStreamError writeOne(S, T = StreamType!S)(ref S stream, T value) if (isSomeOutputStream!S) {
178     T[1] buffer = [value];
179     StreamResult result = stream.writeToStream(buffer);
180     if (result.hasError) return OptionalStreamError(result.error);
181     if (result.count != 1) return OptionalStreamError(StreamError(
182         "Did not write exactly 1 element.",
183         result.count
184     ));
185     return OptionalStreamError.init;
186 }
187 
188 unittest {
189     import streams.types.array;
190 
191     auto sOut1 = arrayOutputStreamFor!int();
192     assert(writeOne(sOut1, 42).notPresent);
193     assert(sOut1.toArrayRaw() == [42]);
194 
195     auto sOut2 = ErrorOutputStream!bool();
196     assert(writeOne(sOut2, false).present);
197 }