1 /** 2 * Defines streams for reading and writing primitive data and arrays of 3 * primitive values. So-called "data" input and output streams are defined as 4 * decorators around an existing "base" stream, so when you read or write on 5 * a data stream, it's just performing an analogous operation on its base 6 * stream. 7 */ 8 module streams.types.data; 9 10 import streams.primitives; 11 import streams.utils : Optional, Either; 12 import std.traits : isScalarType, isStaticArray; 13 14 /** 15 * The result of a data input stream read operation, which is either a value 16 * or an error. 17 */ 18 alias DataReadResult(T) = Either!(T, "value", StreamError, "error"); 19 20 /** 21 * An enum defining the endianness of a stream, which is how it assumes data on 22 * the underlying resource to be written and read, irrespective of the system 23 * endianness. 24 */ 25 enum Endianness { BigEndian, LittleEndian } 26 27 version (BigEndian) { 28 immutable Endianness SYSTEM_ENDIANNESS = Endianness.BigEndian; 29 } else { 30 immutable Endianness SYSTEM_ENDIANNESS = Endianness.LittleEndian; 31 } 32 33 /** 34 * A helper function that flips a static array's elements if needed so that its 35 * byte order matches a target byte order. 36 * Params: 37 * bytes = The bytes to possibly flip. 38 * sourceOrder = The byte order of the source that produced the bytes. 39 * targetOrder = The byte order of the consumer of the bytes. 40 */ 41 private void ensureByteOrder(T)( 42 ref ubyte[T.sizeof] bytes, 43 Endianness sourceOrder, 44 Endianness targetOrder 45 ) if (isScalarType!T) { 46 if (sourceOrder != targetOrder) { 47 ubyte tmp; 48 static foreach (i; 0 .. T.sizeof / 2) { 49 tmp = bytes[i]; 50 bytes[i] = bytes[T.sizeof - 1 - i]; 51 bytes[T.sizeof - 1 - i] = tmp; 52 } 53 } 54 } 55 56 /** 57 * An input stream that is wrapped around a byte input stream, so that values 58 * may be read from the stream as bytes. 59 */ 60 struct DataInputStream(S) if (isByteInputStream!S) { 61 private S stream; 62 private immutable Endianness endianness = SYSTEM_ENDIANNESS; 63 64 /** 65 * Constructs a data input stream that reads from a given underlying byte 66 * input stream. 67 * Params: 68 * stream = The stream to read from. 69 * endianness = The byte order of the resource that this stream is 70 * reading from. Defaults to the system byte order. 71 */ 72 this(S stream, Endianness endianness = SYSTEM_ENDIANNESS) { 73 this.stream = stream; 74 this.endianness = endianness; 75 } 76 77 /** 78 * Delegates reading to the underlying stream. 79 * Params: 80 * buffer = The buffer to read to. 81 * Returns: The number of bytes read. 82 */ 83 StreamResult readFromStream(ubyte[] buffer) { 84 return this.stream.readFromStream(buffer); 85 } 86 87 /** 88 * Reads a value from the stream and returns either the value that was 89 * read successfully, or an error message. If you want to ignore errors, 90 * you can use `readFromStreamOrDefault` instead. 91 * Returns: A result containing either that value that was read, or an error. 92 */ 93 DataReadResult!T readFromStream(T)() { 94 static if (isStaticArray!T) { 95 return readStaticArray!T(); 96 } else static if (isScalarType!T) { 97 return readScalar!T(); 98 } else { 99 static assert( 100 false, 101 "Cannot read values of type " ~ T.stringof ~ ". " ~ 102 "Only scalar types and static arrays are supported." 103 ); 104 } 105 } 106 107 /** 108 * Reads a value from the stream, or return a default value if reading 109 * fails for any reason. 110 * Params: 111 * defaultValue = The default value to return if reading fails. 112 * Returns: The value that was read, or a default value. 113 */ 114 T readFromStreamOrDefault(T)(T defaultValue = T.init) { 115 DataReadResult!T result = this.readFromStream!T(); 116 if (result.hasValue) return result.value; 117 return defaultValue; 118 } 119 120 private DataReadResult!T readScalar(T)() { 121 union U { T value; ubyte[T.sizeof] bytes; } 122 U u; 123 StreamResult readResult = this.stream.readFromStream(u.bytes[]); 124 if (readResult.hasError) return DataReadResult!T(readResult.error); 125 if (readResult.count != T.sizeof) { 126 return DataReadResult!T(StreamError( 127 "Failed to read scalar value of type \"" ~ T.stringof ~ "\"" ~ 128 " from stream of type \"" ~ S.stringof ~ "\".", 129 readResult.count 130 )); 131 } 132 ensureByteOrder!T(u.bytes, SYSTEM_ENDIANNESS, this.endianness); 133 return DataReadResult!T(u.value); 134 } 135 136 private DataReadResult!T readStaticArray(T)() { 137 static if (T.length == 0) return DataReadResult!T(T[0]); 138 alias ElementType = typeof(*T.init.ptr); 139 T data; 140 for (uint i = 0; i < T.length; i++) { 141 DataReadResult!ElementType elementResult = readFromStream!ElementType(); 142 if (elementResult.hasError) { 143 return DataReadResult!T(elementResult.error); 144 } 145 data[i] = elementResult.value; 146 } 147 return DataReadResult!T(data); 148 } 149 } 150 151 /** 152 * Creates and returns a data input stream that's wrapped around the given 153 * byte input stream. 154 * Params: 155 * stream = The stream to wrap in a data input stream. 156 * endianness = The byte order of the resource that this stream is 157 * reading from. Defaults to the system byte order. 158 * Returns: The data input stream. 159 */ 160 DataInputStream!S dataInputStreamFor(S)( 161 S stream, 162 Endianness endianness = SYSTEM_ENDIANNESS 163 ) if (isByteInputStream!S) { 164 return DataInputStream!S(stream, endianness); 165 } 166 167 /** 168 * An output stream that is wrapped around a byte output stream, so that values 169 * may be written to the stream as bytes. 170 */ 171 struct DataOutputStream(S) if (isByteOutputStream!S) { 172 private S stream; 173 private immutable Endianness endianness; 174 175 /** 176 * Constructs the data output stream so it will write to the given stream. 177 * Params: 178 * stream = The stream to write to. 179 * endianness = The byte order of the resource we're writing to. Defaults 180 * to the system byte order. 181 */ 182 this(S stream, Endianness endianness = SYSTEM_ENDIANNESS) { 183 this.stream = stream; 184 this.endianness = endianness; 185 } 186 187 /** 188 * Delegates writing to the underlying stream. 189 * Params: 190 * buffer = The buffer whose contents to write. 191 * Returns: The number of bytes written. 192 */ 193 StreamResult writeToStream(ubyte[] buffer) { 194 return stream.writeToStream(buffer); 195 } 196 197 /** 198 * Writes a value of type `T` to the stream. If writing fails for 199 * whatever reason, a StreamException is thrown. 200 * Params: 201 * value = The value to write. 202 * Returns: A nullable error, which is set if an error occurs. 203 */ 204 OptionalStreamError writeToStream(T)(T value) { 205 static if (isStaticArray!T) { 206 return writeStaticArray!T(value); 207 } else static if (isScalarType!T) { 208 return writeScalar!T(value); 209 } else { 210 static assert( 211 false, 212 "Cannot read values of type " ~ T.stringof ~ ". " ~ 213 "Only scalar types and static arrays are supported." 214 ); 215 } 216 } 217 218 private OptionalStreamError writeScalar(T)(T value) { 219 union U { T value; ubyte[T.sizeof] bytes; } 220 U u; 221 u.value = value; 222 ensureByteOrder!T(u.bytes, this.endianness, SYSTEM_ENDIANNESS); 223 StreamResult writeResult = this.stream.writeToStream(u.bytes[]); 224 if (writeResult.hasError) return OptionalStreamError(writeResult.error); 225 if (writeResult.count != T.sizeof) { 226 return OptionalStreamError(StreamError( 227 "Failed to write scalar value of type \"" ~ T.stringof ~ "\"" ~ 228 " to stream of type \"" ~ S.stringof ~ "\".", 229 writeResult.count 230 )); 231 } 232 return OptionalStreamError.init; 233 } 234 235 private OptionalStreamError writeStaticArray(T)(T value) if (isStaticArray!T) { 236 static if (T.length == 0) return; 237 alias ElementType = typeof(*T.init.ptr); 238 for (uint i = 0; i < T.length; i++) { 239 OptionalStreamError error = writeToStream!ElementType(value[i]); 240 if (error.present) { 241 return error; 242 } 243 } 244 return OptionalStreamError.init; 245 } 246 } 247 248 /** 249 * Creates and returns a data output stream that's wrapped around the given 250 * byte output stream. 251 * Params: 252 * stream = The stream to wrap in a data output stream. 253 * endianness = The byte order of the resource we're writing to. Defaults 254 * to the system byte order. 255 * Returns: The data output stream. 256 */ 257 DataOutputStream!S dataOutputStreamFor(S)( 258 S stream, 259 Endianness endianness = SYSTEM_ENDIANNESS 260 ) if (isByteOutputStream!S) { 261 return DataOutputStream!S(stream, endianness); 262 } 263 264 unittest { 265 import streams.primitives : ErrorOutputStream; 266 import streams.types.array : arrayOutputStreamFor, arrayInputStreamFor, byteArrayOutputStream; 267 268 auto sOut = arrayOutputStreamFor!ubyte; 269 auto dataOut = dataOutputStreamFor(&sOut); 270 dataOut.writeToStream!int(42); 271 dataOut.writeToStream(true); 272 char[5] word = "Hello"; 273 dataOut.writeToStream(word); 274 ubyte[] data = sOut.toArrayRaw(); 275 auto sIn = arrayInputStreamFor(data); 276 auto dataIn = dataInputStreamFor(&sIn); 277 assert(dataIn.readFromStreamOrDefault!int() == 42); 278 assert(dataIn.readFromStreamOrDefault!bool() == true); 279 assert(dataIn.readFromStreamOrDefault!(char[5]) == word); 280 DataReadResult!ubyte result = dataIn.readFromStream!ubyte(); 281 assert(result.hasError); 282 assert(result.error.code == 0); 283 284 // Test that reading normally still works. 285 ubyte[4] sIn1Data = [1, 2, 3, 4]; 286 auto sIn1 = arrayInputStreamFor!ubyte(sIn1Data); 287 auto dataIn1 = dataInputStreamFor(&sIn1); 288 ubyte[3] buffer1; 289 assert(dataIn1.readFromStream(buffer1) == StreamResult(3)); 290 ubyte[3] buffer1Expected = [1, 2, 3]; 291 assert(buffer1 == buffer1Expected); 292 293 // Test that writing normally still works. 294 auto sOut1 = arrayOutputStreamFor!ubyte; 295 auto dataOut1 = dataOutputStreamFor(&sOut1); 296 ubyte[3] buffer2 = [1, 2, 3]; 297 dataOut1.writeToStream(buffer2); 298 assert(sOut1.toArrayRaw() == buffer2); 299 300 // Test that reading a static array with invalid data returns an error. 301 ubyte[3] buffer3 = [1, 2, 3]; 302 auto sIn3 = arrayInputStreamFor!ubyte(buffer3); 303 auto dataIn3 = dataInputStreamFor(&sIn3); 304 DataReadResult!(ubyte[4]) result3 = dataIn3.readFromStream!(ubyte[4])(); 305 assert(result3.hasError); 306 assert(result3.error.code == 0); 307 308 // Test that writing a value to an output stream that doesn't write all bytes, causes an error. 309 auto sOut3 = NoOpOutputStream!ubyte(); 310 auto dataOut3 = dataOutputStreamFor(&sOut3); 311 assert(dataOut3.writeToStream(42u).present); 312 313 // Test that writing a value to an output stream that errors, also causes an error. 314 auto sOut4 = ErrorOutputStream!ubyte(); 315 auto dataOut4 = dataOutputStreamFor(&sOut4); 316 OptionalStreamError error4 = dataOut4.writeToStream(1); 317 assert(error4.present); 318 319 // Test that writing a static array fails if writing one element fails. 320 auto sOut5 = ErrorOutputStream!ubyte(); 321 auto dataOut5 = dataOutputStreamFor(&sOut5); 322 OptionalStreamError error5 = dataOut5.writeToStream!(int[3])([3, 2, 1]); 323 assert(error5.present); 324 325 // Test that writing to an array with opposite byte order works as expected. 326 auto sOut6 = byteArrayOutputStream(); 327 auto dataOut6 = dataOutputStreamFor(&sOut6, Endianness.BigEndian); 328 dataOut6.writeToStream!short(1); 329 assert(sOut6.toArrayRaw() == [0, 1]); 330 sOut6.reset(); 331 auto dataOut7 = dataOutputStreamFor(&sOut6, Endianness.LittleEndian); 332 dataOut7.writeToStream!short(1); 333 assert(sOut6.toArrayRaw() == [1, 0]); 334 335 // Test that readOrDefault works. 336 ubyte[4] data7 = [1, 2, 3, 4]; 337 auto sIn7 = arrayInputStreamFor(data7); 338 auto dataIn7 = dataInputStreamFor(&sIn7); 339 for (uint i = 0; i < 4; i++) { 340 assert(dataIn7.readFromStreamOrDefault!ubyte() == data7[i]); 341 } 342 assert(dataIn7.readFromStreamOrDefault!ubyte(42) == 42); 343 }