1 /** 2 * A collection of array-backed streams for in-memory reading and writing. 3 */ 4 module streams.types.array; 5 6 import streams.primitives : StreamResult, StreamError, OptionalStreamError; 7 8 /** 9 * An input stream that reads from an array of items. 10 */ 11 struct ArrayInputStream(E) { 12 private E[] array; 13 private uint currentIndex = 0; 14 15 /** 16 * Reads items from this input stream's array into the given buffer, 17 * where successive `read` calls will read sequentially from the array, 18 * so that, for example, with an array `[1, 2, 3, 4]`, reading with a 19 * buffer size of 2 will first read `[1, 2]`, and then `[3, 4]`. 20 * Params: 21 * buffer = The buffer read array elements into. 22 * Returns: The number of elements that were read. 23 */ 24 StreamResult readFromStream(E[] buffer) { 25 if (this.currentIndex >= this.array.length) return StreamResult(0); 26 uint lengthToRead = cast(uint) this.array.length - this.currentIndex; 27 if (lengthToRead > buffer.length) { 28 lengthToRead = cast(uint) buffer.length; 29 } 30 version (D_BetterC) { 31 foreach (uint idx; 0 .. lengthToRead) { 32 buffer[idx] = this.array[this.currentIndex + idx]; 33 } 34 } else { 35 import std.algorithm.mutation : copy; 36 copy(this.array[this.currentIndex .. this.currentIndex + lengthToRead], buffer[0 .. lengthToRead]); 37 } 38 this.currentIndex += lengthToRead; 39 return StreamResult(lengthToRead); 40 } 41 42 /** 43 * Resets this input stream's record of the next index to read from, so 44 * that the next call to `read` will read from the start of the array. 45 */ 46 OptionalStreamError reset() { 47 this.currentIndex = 0; 48 return OptionalStreamError.init; 49 } 50 } 51 52 unittest { 53 import streams.primitives : isSomeInputStream, isInputStream; 54 55 assert(isSomeInputStream!(ArrayInputStream!int)); 56 assert(isInputStream!(ArrayInputStream!ubyte, ubyte)); 57 58 int[5] data = [1, 2, 3, 4, 5]; 59 auto s1 = ArrayInputStream!int(data); 60 int[2] buffer; 61 assert(s1.readFromStream(buffer) == StreamResult(2)); 62 assert(buffer == [1, 2]); 63 assert(s1.readFromStream(buffer) == StreamResult(2)); 64 assert(buffer == [3, 4]); 65 assert(s1.readFromStream(buffer) == StreamResult(1)); 66 assert(buffer == [5, 4]); 67 assert(s1.readFromStream(buffer) == StreamResult(0)); 68 69 s1.reset(); 70 assert(s1.readFromStream(buffer) == StreamResult(2)); 71 assert(buffer == [1, 2]); 72 // Test that the stream works for a single buffer. 73 int[1] singleBuffer; 74 assert(s1.readFromStream(singleBuffer) == StreamResult(1)); 75 assert(singleBuffer[0] == 3); 76 assert(s1.readFromStream(singleBuffer) == StreamResult(1)); 77 assert(singleBuffer[0] == 4); 78 s1.readFromStream(singleBuffer); // Skip the last element. 79 assert(s1.readFromStream(singleBuffer) == StreamResult(0)); 80 81 // Test reading to a buffer that's larger than the stream's internal buffer. 82 ubyte[4] data3 = [5, 4, 3, 2]; 83 ubyte[10] buffer3; 84 auto s3 = ArrayInputStream!ubyte(data3); 85 assert(s3.readFromStream(buffer3) == StreamResult(4)); 86 assert(buffer3[0 .. 4] == data3); 87 assert(s3.readFromStream(buffer3) == StreamResult(0)); 88 89 } 90 91 /** 92 * Creates and returns an array input stream wrapped around the given array 93 * of elements. 94 * Params: 95 * array = The array to stream. 96 * Returns: The array input stream. 97 */ 98 ArrayInputStream!E arrayInputStreamFor(E)(E[] array) { 99 return ArrayInputStream!E(array); 100 } 101 102 const uint DEFAULT_INITIAL_CAPACITY = 64; 103 104 /** 105 * An output stream that writes to an internal array. The resulting array can 106 * be obtained with `toArray()`. It is BetterC compatible, using manual memory 107 * management. 108 * 109 * When constructed, an initial block of memory is allocated, and then any new 110 * memory will be allocated according to the output stream's allocation 111 * strategy. If the `None` strategy is used, attempts to write more elements 112 * than the array contains will throw an unrecoverable Error. 113 */ 114 struct ArrayOutputStream(E) { 115 import streams.utils : AppendableBuffer, BufferAllocationStrategy; 116 117 private AppendableBuffer!E buffer = AppendableBuffer!E(DEFAULT_INITIAL_CAPACITY, BufferAllocationStrategy.Doubling); 118 119 /** 120 * Writes data to this output stream's internal array. 121 * Params: 122 * buffer = The elements to write to the stream. 123 * Returns: The number of elements that were written. Barring memory 124 * issues, this will always equal the buffer's length. 125 */ 126 StreamResult writeToStream(E[] buffer) { 127 this.buffer.appendItems(buffer); 128 return StreamResult(cast(uint) buffer.length); 129 } 130 131 version (D_BetterC) {} else { 132 /** 133 * Gets the internal array to which elements have been appended. This 134 * method is not compatible with BetterC mode; use `toArrayRaw` for that. 135 * Returns: The internal array. 136 */ 137 E[] toArray() { 138 E[] array = this.toArrayRaw(); 139 E[] dynArray = new E[array.length]; 140 dynArray[0 .. $] = array[0 .. $]; 141 return dynArray; 142 } 143 } 144 145 /** 146 * Gets a slice representing the underlying array to which elements have 147 * been appended. Take caution, as this memory segment may be freed if this 148 * stream is deconstructed or written to again. 149 * Returns: A slice representing the array of elements that have been added 150 * to this stream. 151 */ 152 E[] toArrayRaw() { 153 return this.buffer.toArray(); 154 } 155 156 /** 157 * Resets the internal array, such that `toArray()` returns an empty 158 * array. 159 */ 160 void reset() { 161 this.buffer.reset(); 162 } 163 } 164 165 /** 166 * Creates and returns an array output stream for elements of the given type, 167 * using a default initial capacity of `ArrayOutputStream.DEFAULT_INITIAL_CAPACITY` 168 * and default allocation strategy of `AllocationStrategy.Doubling`. 169 * Returns: The array output stream. 170 */ 171 ArrayOutputStream!E arrayOutputStreamFor(E)() { 172 return ArrayOutputStream!E(); 173 } 174 175 /** 176 * Creates and returns a byte array output stream. 177 * Returns: The byte array output stream. 178 */ 179 ArrayOutputStream!ubyte byteArrayOutputStream() { 180 return arrayOutputStreamFor!ubyte; 181 } 182 183 unittest { 184 import streams.primitives : isSomeOutputStream; 185 186 assert(isSomeOutputStream!(ArrayOutputStream!bool)); 187 188 auto s1 = arrayOutputStreamFor!float; 189 float[3] buffer = [0.5, 1, 1.5]; 190 assert(s1.writeToStream(buffer) == StreamResult(3)); 191 buffer = [2, 2.5, 3]; 192 assert(s1.writeToStream(buffer) == StreamResult(3)); 193 assert(s1.toArrayRaw() == [0.5, 1, 1.5, 2, 2.5, 3]); 194 assert(s1.toArrayRaw() == [0.5, 1, 1.5, 2, 2.5, 3]); 195 196 auto s2 = byteArrayOutputStream(); 197 ubyte[3] buffer1 = [1, 2, 3]; 198 s2.writeToStream(buffer1); 199 ubyte[] data = s2.toArrayRaw(); 200 assert(data == [1, 2, 3]); 201 202 s2.reset(); 203 assert(s2.toArrayRaw().length == 0); 204 s2.writeToStream(buffer1); 205 assert(s2.toArrayRaw() == [1, 2, 3]); 206 207 version (D_BetterC) {} else { 208 // Test the dynamic toArray method. 209 assert(s2.toArray() == [1, 2, 3]); 210 } 211 }