1 /** 2 * Concatenating streams that linearly combine reading and writing from 3 * multiple resources. 4 */ 5 module streams.types.concat; 6 7 import streams.primitives; 8 9 version (Have_slf4d) {import slf4d;} 10 11 /** 12 * A concatenating input stream that reads from one stream until it returns 13 * zero elements, then reads from the second stream. 14 */ 15 struct ConcatInputStream(E, S1, S2) if (isInputStream!(S1, E) && isInputStream!(S2, E)) { 16 private S1 stream1; 17 private bool stream1Empty = false; 18 private S2 stream2; 19 private bool stream2Empty = false; 20 21 /** 22 * Constructs a new concatenating stream from two input streams. 23 * Params: 24 * stream1 = The first stream to read from. 25 * stream2 = The second stream to read from. 26 */ 27 this(S1 stream1, S2 stream2) { 28 this.stream1 = stream1; 29 this.stream2 = stream2; 30 } 31 32 /** 33 * Reads from the streams that this one is concatenating, reading from the 34 * first stream until it's empty, and then reading from the second stream. 35 * Params: 36 * buffer = The buffer to read into. 37 * Returns: The number of elements read, or an error. 38 */ 39 StreamResult readFromStream(E[] buffer) { 40 uint bufferIndex = 0; 41 if (!this.stream1Empty) { 42 version (Have_slf4d) { 43 traceF!"Reading up to %d elements from the first stream: %s"(buffer.length, typeof(stream1).stringof); 44 } 45 StreamResult result1 = this.stream1.readFromStream(buffer); 46 if (result1.hasError) return result1; 47 if (result1.count == buffer.length) return result1; 48 // Less than buffer.length elements were read. 49 this.stream1Empty = true; 50 bufferIndex = result1.count; 51 version (Have_slf4d) { 52 traceF!"Finished reading from the first stream after %d elements."(bufferIndex); 53 } 54 } 55 if (!this.stream2Empty) { 56 const uint elementsToRead = cast(uint) buffer.length - bufferIndex; 57 version (Have_slf4d) { 58 traceF!"Reading up to %d elements from the second stream: %s"(elementsToRead, typeof(stream2).stringof); 59 } 60 StreamResult result2 = this.stream2.readFromStream(buffer[bufferIndex .. $]); 61 if (result2.hasError) return result2; 62 if (result2.count == elementsToRead) return StreamResult(bufferIndex + result2.count); 63 // Less than the required elements to fill the buffer were read. 64 this.stream2Empty = true; 65 return StreamResult(bufferIndex + result2.count); 66 } 67 // Both streams are empty. 68 return StreamResult(0); 69 } 70 } 71 72 /** 73 * Function to obtain a concatenating input stream that reads from `stream1`, 74 * and then `stream2`. 75 * Params: 76 * stream1 = The first stream to read from. 77 * stream2 = The second stream to read from. 78 * Returns: The concatenating input stream. 79 */ 80 ConcatInputStream!(StreamType!S1, S1, S2) concatInputStreamFor(S1, S2)(S1 stream1, S2 stream2) { 81 return ConcatInputStream!(StreamType!S1, S1, S2)(stream1, stream2); 82 } 83 84 unittest { 85 import streams.types.array; 86 int[3] bufA = [1, 2, 3]; 87 int[3] bufB = [4, 5, 6]; 88 auto concatAandB = concatInputStreamFor( 89 arrayInputStreamFor(bufA), 90 arrayInputStreamFor(bufB) 91 ); 92 int[2] buf; 93 94 auto result = concatAandB.readFromStream(buf); 95 assert(!result.hasError); 96 assert(result.count == 2); 97 assert(buf == [1, 2]); 98 99 result = concatAandB.readFromStream(buf); 100 assert(!result.hasError); 101 assert(result.count == 2); 102 assert(buf == [3, 4]); 103 104 result = concatAandB.readFromStream(buf); 105 assert(!result.hasError); 106 assert(result.count == 2); 107 assert(buf == [5, 6]); 108 109 result = concatAandB.readFromStream(buf); 110 assert(!result.hasError); 111 assert(result.count == 0); 112 113 // Try reading from two empty streams. 114 auto emptyConcat = concatInputStreamFor(NoOpInputStream!int(), NoOpInputStream!int()); 115 int[10] buf2; 116 assert(emptyConcat.readFromStream(buf2) == StreamResult(0)); 117 assert(emptyConcat.readFromStream(buf2) == StreamResult(0)); // Test again just to cover the last case. 118 }