1 /**
2  * Concatenating streams that linearly combine reading and writing from
3  * multiple resources.
4  */
5 module streams.types.concat;
6 
7 import streams.primitives;
8 
9 version (Have_slf4d) {import slf4d;}
10 
11 /**
12  * A concatenating input stream that reads from one stream until it returns
13  * zero elements, then reads from the second stream.
14  */
15 struct ConcatInputStream(E, S1, S2) if (isInputStream!(S1, E) && isInputStream!(S2, E)) {
16     private S1 stream1;
17     private bool stream1Empty = false;
18     private S2 stream2;
19     private bool stream2Empty = false;
20 
21     /**
22      * Constructs a new concatenating stream from two input streams.
23      * Params:
24      *   stream1 = The first stream to read from.
25      *   stream2 = The second stream to read from.
26      */
27     this(S1 stream1, S2 stream2) {
28         this.stream1 = stream1;
29         this.stream2 = stream2;
30     }
31 
32     /** 
33      * Reads from the streams that this one is concatenating, reading from the
34      * first stream until it's empty, and then reading from the second stream.
35      * Params:
36      *   buffer = The buffer to read into.
37      * Returns: The number of elements read, or an error.
38      */
39     StreamResult readFromStream(E[] buffer) {
40         uint bufferIndex = 0;
41         if (!this.stream1Empty) {
42             version (Have_slf4d) {
43                 traceF!"Reading up to %d elements from the first stream: %s"(buffer.length, typeof(stream1).stringof);
44             }
45             StreamResult result1 = this.stream1.readFromStream(buffer);
46             if (result1.hasError) return result1;
47             if (result1.count == buffer.length) return result1;
48             // Less than buffer.length elements were read.
49             this.stream1Empty = true;
50             bufferIndex = result1.count;
51             version (Have_slf4d) {
52                 traceF!"Finished reading from the first stream after %d elements."(bufferIndex);
53             }
54         }
55         if (!this.stream2Empty) {
56             const uint elementsToRead = cast(uint) buffer.length - bufferIndex;
57             version (Have_slf4d) {
58                 traceF!"Reading up to %d elements from the second stream: %s"(elementsToRead, typeof(stream2).stringof);
59             }
60             StreamResult result2 = this.stream2.readFromStream(buffer[bufferIndex .. $]);
61             if (result2.hasError) return result2;
62             if (result2.count == elementsToRead) return StreamResult(bufferIndex + result2.count);
63             // Less than the required elements to fill the buffer were read.
64             this.stream2Empty = true;
65             return StreamResult(bufferIndex + result2.count);
66         }
67         // Both streams are empty.
68         return StreamResult(0);
69     }
70 }
71 
72 /**
73  * Function to obtain a concatenating input stream that reads from `stream1`,
74  * and then `stream2`.
75  * Params:
76  *   stream1 = The first stream to read from.
77  *   stream2 = The second stream to read from.
78  * Returns: The concatenating input stream.
79  */
80 ConcatInputStream!(StreamType!S1, S1, S2) concatInputStreamFor(S1, S2)(S1 stream1, S2 stream2) {
81     return ConcatInputStream!(StreamType!S1, S1, S2)(stream1, stream2);
82 }
83 
84 unittest {
85     import streams.types.array;
86     int[3] bufA = [1, 2, 3];
87     int[3] bufB = [4, 5, 6];
88     auto concatAandB = concatInputStreamFor(
89         arrayInputStreamFor(bufA),
90         arrayInputStreamFor(bufB)
91     );
92     int[2] buf;
93     
94     auto result = concatAandB.readFromStream(buf);
95     assert(!result.hasError);
96     assert(result.count == 2);
97     assert(buf == [1, 2]);
98 
99     result = concatAandB.readFromStream(buf);
100     assert(!result.hasError);
101     assert(result.count == 2);
102     assert(buf == [3, 4]);
103 
104     result = concatAandB.readFromStream(buf);
105     assert(!result.hasError);
106     assert(result.count == 2);
107     assert(buf == [5, 6]);
108 
109     result = concatAandB.readFromStream(buf);
110     assert(!result.hasError);
111     assert(result.count == 0);
112 
113     // Try reading from two empty streams.
114     auto emptyConcat = concatInputStreamFor(NoOpInputStream!int(), NoOpInputStream!int());
115     int[10] buf2;
116     assert(emptyConcat.readFromStream(buf2) == StreamResult(0));
117     assert(emptyConcat.readFromStream(buf2) == StreamResult(0)); // Test again just to cover the last case.
118 }