1 /**
2  * Defines streams for reading and writing primitive data and arrays of
3  * primitive values. So-called "data" input and output streams are defined as
4  * decorators around an existing "base" stream, so when you read or write on
5  * a data stream, it's just performing an analogous operation on its base
6  * stream.
7  */
8 module streams.types.data;
9 
10 import streams.primitives;
11 import streams.utils : Optional, Either;
12 import std.traits : isScalarType, isStaticArray;
13 
14 /**
15  * The result of a data input stream read operation, which is either a value
16  * or an error.
17  */
18 alias DataReadResult(T) = Either!(T, "value", StreamError, "error");
19 
20 /** 
21  * An enum defining the endianness of a stream, which is how it assumes data on
22  * the underlying resource to be written and read, irrespective of the system
23  * endianness.
24  */
25 enum Endianness { BigEndian, LittleEndian }
26 
27 version (BigEndian) {
28     immutable Endianness SYSTEM_ENDIANNESS = Endianness.BigEndian;
29 } else {
30     immutable Endianness SYSTEM_ENDIANNESS = Endianness.LittleEndian;
31 }
32 
33 /** 
34  * A helper function that flips a static array's elements if needed so that its
35  * byte order matches a target byte order.
36  * Params:
37  *   bytes = The bytes to possibly flip.
38  *   sourceOrder = The byte order of the source that produced the bytes.
39  *   targetOrder = The byte order of the consumer of the bytes.
40  */
41 private void ensureByteOrder(T)(
42     ref ubyte[T.sizeof] bytes,
43     Endianness sourceOrder,
44     Endianness targetOrder
45 ) if (isScalarType!T) {
46     if (sourceOrder != targetOrder) {
47         ubyte tmp;
48         static foreach (i; 0 .. T.sizeof / 2) {
49             tmp = bytes[i];
50             bytes[i] = bytes[T.sizeof - 1 - i];
51             bytes[T.sizeof - 1 - i] = tmp;
52         }
53     }
54 }
55 
56 /** 
57  * An input stream that is wrapped around a byte input stream, so that values
58  * may be read from the stream as bytes.
59  */
60 struct DataInputStream(S) if (isByteInputStream!S) {
61     private S stream;
62     private immutable Endianness endianness = SYSTEM_ENDIANNESS;
63 
64     /** 
65      * Constructs a data input stream that reads from a given underlying byte
66      * input stream.
67      * Params:
68      *   stream = The stream to read from.
69      *   endianness = The byte order of the resource that this stream is
70      *                reading from. Defaults to the system byte order.
71      */
72     this(S stream, Endianness endianness = SYSTEM_ENDIANNESS) {
73         this.stream = stream;
74         this.endianness = endianness;
75     }
76 
77     /** 
78      * Delegates reading to the underlying stream.
79      * Params:
80      *   buffer = The buffer to read to.
81      * Returns: The number of bytes read.
82      */
83     StreamResult readFromStream(ubyte[] buffer) {
84         return this.stream.readFromStream(buffer);
85     }
86 
87     /** 
88      * Reads a value from the stream and returns either the value that was
89      * read successfully, or an error message. If you want to ignore errors,
90      * you can use `readFromStreamOrDefault` instead.
91      * Returns: A result containing either that value that was read, or an error.
92      */
93     DataReadResult!T readFromStream(T)() {
94         static if (isStaticArray!T) {
95             return readStaticArray!T();
96         } else static if (isScalarType!T) {
97             return readScalar!T();
98         } else {
99             static assert(
100                 false,
101                 "Cannot read values of type " ~ T.stringof ~ ". " ~
102                 "Only scalar types and static arrays are supported."
103             );
104         }
105     }
106 
107     /** 
108      * Reads a value from the stream, or return a default value if reading
109      * fails for any reason.
110      * Params:
111      *   defaultValue = The default value to return if reading fails.
112      * Returns: The value that was read, or a default value.
113      */
114     T readFromStreamOrDefault(T)(T defaultValue = T.init) {
115         DataReadResult!T result = this.readFromStream!T();
116         if (result.hasValue) return result.value;
117         return defaultValue;
118     }
119 
120     private DataReadResult!T readScalar(T)() {
121         union U { T value; ubyte[T.sizeof] bytes; }
122         U u;
123         StreamResult readResult = this.stream.readFromStream(u.bytes[]);
124         if (readResult.hasError) return DataReadResult!T(readResult.error);
125         if (readResult.count != T.sizeof) {
126             return DataReadResult!T(StreamError(
127                 "Failed to read scalar value of type \"" ~ T.stringof ~ "\"" ~
128                 " from stream of type \"" ~ S.stringof ~ "\".",
129                 readResult.count
130             ));
131         }
132         ensureByteOrder!T(u.bytes, SYSTEM_ENDIANNESS, this.endianness);
133         return DataReadResult!T(u.value);
134     }
135 
136     private DataReadResult!T readStaticArray(T)() {
137         static if (T.length == 0) return DataReadResult!T(T[0]);
138         alias ElementType = typeof(*T.init.ptr);
139         T data;
140         for (uint i = 0; i < T.length; i++) {
141             DataReadResult!ElementType elementResult = readFromStream!ElementType();
142             if (elementResult.hasError) {
143                 return DataReadResult!T(elementResult.error);
144             }
145             data[i] = elementResult.value;
146         }
147         return DataReadResult!T(data);
148     }
149 }
150 
151 /** 
152  * Creates and returns a data input stream that's wrapped around the given
153  * byte input stream.
154  * Params:
155  *   stream = The stream to wrap in a data input stream.
156  *   endianness = The byte order of the resource that this stream is
157  *                reading from. Defaults to the system byte order.
158  * Returns: The data input stream.
159  */
160 DataInputStream!S dataInputStreamFor(S)(
161     S stream,
162     Endianness endianness = SYSTEM_ENDIANNESS
163 ) if (isByteInputStream!S) {
164     return DataInputStream!S(stream, endianness);
165 }
166 
167 /** 
168  * An output stream that is wrapped around a byte output stream, so that values
169  * may be written to the stream as bytes.
170  */
171 struct DataOutputStream(S) if (isByteOutputStream!S) {
172     private S stream;
173     private immutable Endianness endianness;
174 
175     /** 
176      * Constructs the data output stream so it will write to the given stream.
177      * Params:
178      *   stream = The stream to write to.
179      *   endianness = The byte order of the resource we're writing to. Defaults
180      *                to the system byte order.
181      */
182     this(S stream, Endianness endianness = SYSTEM_ENDIANNESS) {
183         this.stream = stream;
184         this.endianness = endianness;
185     }
186 
187     /** 
188      * Delegates writing to the underlying stream.
189      * Params:
190      *   buffer = The buffer whose contents to write.
191      * Returns: The number of bytes written.
192      */
193     StreamResult writeToStream(ubyte[] buffer) {
194         return stream.writeToStream(buffer);
195     }
196 
197     /** 
198      * Writes a value of type `T` to the stream. If writing fails for
199      * whatever reason, a StreamException is thrown.
200      * Params:
201      *   value = The value to write.
202      * Returns: A nullable error, which is set if an error occurs.
203      */
204     OptionalStreamError writeToStream(T)(T value) {
205         static if (isStaticArray!T) {
206             return writeStaticArray!T(value);
207         } else static if (isScalarType!T) {
208             return writeScalar!T(value);
209         } else {
210             static assert(
211                 false,
212                 "Cannot read values of type " ~ T.stringof ~ ". " ~
213                 "Only scalar types and static arrays are supported."
214             );
215         }
216     }
217 
218     private OptionalStreamError writeScalar(T)(T value) {
219         union U { T value; ubyte[T.sizeof] bytes; }
220         U u;
221         u.value = value;
222         ensureByteOrder!T(u.bytes, this.endianness, SYSTEM_ENDIANNESS);
223         StreamResult writeResult = this.stream.writeToStream(u.bytes[]);
224         if (writeResult.hasError) return OptionalStreamError(writeResult.error);
225         if (writeResult.count != T.sizeof) {
226             return OptionalStreamError(StreamError(
227                 "Failed to write scalar value of type \"" ~ T.stringof ~ "\"" ~
228                 " to stream of type \"" ~ S.stringof ~ "\".",
229                 writeResult.count
230             ));
231         }
232         return OptionalStreamError.init;
233     }
234 
235     private OptionalStreamError writeStaticArray(T)(T value) if (isStaticArray!T) {
236         static if (T.length == 0) return;
237         alias ElementType = typeof(*T.init.ptr);
238         for (uint i = 0; i < T.length; i++) {
239             OptionalStreamError error = writeToStream!ElementType(value[i]);
240             if (error.present) {
241                 return error;
242             }
243         }
244         return OptionalStreamError.init;
245     }
246 }
247 
248 /** 
249  * Creates and returns a data output stream that's wrapped around the given
250  * byte output stream.
251  * Params:
252  *   stream = The stream to wrap in a data output stream.
253  *   endianness = The byte order of the resource we're writing to. Defaults
254  *                to the system byte order.
255  * Returns: The data output stream.
256  */
257 DataOutputStream!S dataOutputStreamFor(S)(
258     S stream,
259     Endianness endianness = SYSTEM_ENDIANNESS
260 ) if (isByteOutputStream!S) {
261     return DataOutputStream!S(stream, endianness);
262 }
263 
264 unittest {
265     import streams.primitives : ErrorOutputStream;
266     import streams.types.array : arrayOutputStreamFor, arrayInputStreamFor, byteArrayOutputStream;
267 
268     auto sOut = arrayOutputStreamFor!ubyte;
269     auto dataOut = dataOutputStreamFor(&sOut);
270     dataOut.writeToStream!int(42);
271     dataOut.writeToStream(true);
272     char[5] word = "Hello";
273     dataOut.writeToStream(word);
274     ubyte[] data = sOut.toArrayRaw();
275     auto sIn = arrayInputStreamFor(data);
276     auto dataIn = dataInputStreamFor(&sIn);
277     assert(dataIn.readFromStreamOrDefault!int() == 42);
278     assert(dataIn.readFromStreamOrDefault!bool() == true);
279     assert(dataIn.readFromStreamOrDefault!(char[5]) == word);
280     DataReadResult!ubyte result = dataIn.readFromStream!ubyte();
281     assert(result.hasError);
282     assert(result.error.code == 0);
283 
284     // Test that reading normally still works.
285     ubyte[4] sIn1Data = [1, 2, 3, 4];
286     auto sIn1 = arrayInputStreamFor!ubyte(sIn1Data);
287     auto dataIn1 = dataInputStreamFor(&sIn1);
288     ubyte[3] buffer1;
289     assert(dataIn1.readFromStream(buffer1) == StreamResult(3));
290     ubyte[3] buffer1Expected = [1, 2, 3];
291     assert(buffer1 == buffer1Expected);
292 
293     // Test that writing normally still works.
294     auto sOut1 = arrayOutputStreamFor!ubyte;
295     auto dataOut1 = dataOutputStreamFor(&sOut1);
296     ubyte[3] buffer2 = [1, 2, 3];
297     dataOut1.writeToStream(buffer2);
298     assert(sOut1.toArrayRaw() == buffer2);
299 
300     // Test that reading a static array with invalid data returns an error.
301     ubyte[3] buffer3 = [1, 2, 3];
302     auto sIn3 = arrayInputStreamFor!ubyte(buffer3);
303     auto dataIn3 = dataInputStreamFor(&sIn3);
304     DataReadResult!(ubyte[4]) result3 = dataIn3.readFromStream!(ubyte[4])();
305     assert(result3.hasError);
306     assert(result3.error.code == 0);
307 
308     // Test that writing a value to an output stream that doesn't write all bytes, causes an error.
309     auto sOut3 = NoOpOutputStream!ubyte();
310     auto dataOut3 = dataOutputStreamFor(&sOut3);
311     assert(dataOut3.writeToStream(42u).present);
312 
313     // Test that writing a value to an output stream that errors, also causes an error.
314     auto sOut4 = ErrorOutputStream!ubyte();
315     auto dataOut4 = dataOutputStreamFor(&sOut4);
316     OptionalStreamError error4 = dataOut4.writeToStream(1);
317     assert(error4.present);
318 
319     // Test that writing a static array fails if writing one element fails.
320     auto sOut5 = ErrorOutputStream!ubyte();
321     auto dataOut5 = dataOutputStreamFor(&sOut5);
322     OptionalStreamError error5 = dataOut5.writeToStream!(int[3])([3, 2, 1]);
323     assert(error5.present);
324 
325     // Test that writing to an array with opposite byte order works as expected.
326     auto sOut6 = byteArrayOutputStream();
327     auto dataOut6 = dataOutputStreamFor(&sOut6, Endianness.BigEndian);
328     dataOut6.writeToStream!short(1);
329     assert(sOut6.toArrayRaw() == [0, 1]);
330     sOut6.reset();
331     auto dataOut7 = dataOutputStreamFor(&sOut6, Endianness.LittleEndian);
332     dataOut7.writeToStream!short(1);
333     assert(sOut6.toArrayRaw() == [1, 0]);
334 
335     // Test that readOrDefault works.
336     ubyte[4] data7 = [1, 2, 3, 4];
337     auto sIn7 = arrayInputStreamFor(data7);
338     auto dataIn7 = dataInputStreamFor(&sIn7);
339     for (uint i = 0; i < 4; i++) {
340         assert(dataIn7.readFromStreamOrDefault!ubyte() == data7[i]);
341     }
342     assert(dataIn7.readFromStreamOrDefault!ubyte(42) == 42);
343 }