1 /**
2  * Defines some components that provide interoperability with Phobos ranges,
3  * including converting streams to and from ranges.
4  */
5 module streams.range;
6 
7 import streams.primitives;
8 import streams.utils;
9 import std.range : isInputRange, isOutputRange, ElementType, empty, front, popFront, put;
10 
11 /** 
12  * A struct that, when initialized with an input stream, acts as a Phobos-style
13  * input range for elements of the same type.
14  */
15 struct InputStreamRange(S, E = StreamType!S) if (isInputStream!(S, E)) {
16     private S stream;
17     private Optional!E lastElement;
18     private StreamResult lastRead;
19 
20     /** 
21      * Constructs this range using a reference to a stream.
22      * Params:
23      *   stream = The input stream to read from.
24      */
25     this(S stream) {
26         this.stream = stream;
27         // Initialize the range with one element.
28         this.popFront();
29     }
30 
31     /** 
32      * Pops the last-read element from the stream, and buffers the next one so
33      * that calling `front()` will return the next element.
34      */
35     void popFront() {
36         E[1] buffer;
37         this.lastRead = this.stream.readFromStream(buffer);
38         if (this.lastRead.hasCount && this.lastRead.count > 0) {
39             this.lastElement = Optional!E(buffer[0]);
40         } else {
41             this.lastElement = Optional!E.init;
42         }
43     }
44 
45     /** 
46      * Determines if the stream is empty. We consider a stream as empty when
47      * reading from it returns a result of 0 elements.
48      * Returns: `true` if the underlying stream is empty.
49      */
50     bool empty() {
51         return !this.lastRead.hasCount || this.lastRead.count == 0;
52     }
53 
54     /** 
55      * Gets the last-read element from the stream.
56      * Returns: The last-read element from the stream.
57      */
58     E front() {
59         return this.lastElement.value;
60     }
61 }
62 
63 /** 
64  * Wraps an existing input stream as a Phobos-style input range, to make any
65  * input stream compatible with functions that take input ranges.
66  * Params:
67  *   stream = The stream to wrap.
68  * Returns: The input range.
69  */
70 auto asInputRange(S, E = StreamType!S)(S stream) if (isInputStream!(S, E)) {
71     return InputStreamRange!(S, E)(stream);
72 }
73 
74 unittest {
75     import streams : arrayInputStreamFor;
76     ubyte[3] buf = [1, 2, 3];
77     auto r = asInputRange(arrayInputStreamFor(buf));
78     assert(isInputRange!(typeof(r)));
79     assert(!r.empty());
80     assert(r.front() == 1);
81     r.popFront();
82     assert(!r.empty());
83     assert(r.front() == 2);
84     r.popFront();
85     assert(!r.empty());
86     assert(r.front() == 3);
87     r.popFront();
88     assert(r.empty());
89 }
90 
91 /** 
92  * An input stream implementation that wraps around a Phobos-style input range.
93  */
94 struct InputRangeStream(R, E = ElementType!R) if (isInputRange!R) {
95     private R range;
96 
97     /**
98      * Pops elements from the underlying input range to fill `buffer`.
99      * Params:
100      *   buffer = The buffer to fill.
101      * Returns: The number of items that were read.
102      */
103     StreamResult readFromStream(E[] buffer) {
104         int readCount = 0;
105         while (readCount < buffer.length && !this.range.empty()) {
106             E element = this.range.front();
107             buffer[readCount++] = element;
108             this.range.popFront();
109         }
110         return StreamResult(readCount);
111     }
112 }
113 
114 /** 
115  * Wraps a Phobos-style input range as an input stream.
116  * Params:
117  *   range = The range to wrap in an input stream.
118  * Returns: An input stream that reads data from the underlying input range.
119  */
120 auto asInputStream(R, E = ElementType!R)(R range) if (isInputRange!R) {
121     return InputRangeStream!(R, E)(range);
122 }
123 
124 unittest {
125     float[5] inputRange = [0.25, 0.5, 0.75, 1.0, 1.25];
126     auto inputStream = asInputStream(inputRange[]);
127     float[2] buffer = [0, 0];
128     StreamResult result = inputStream.readFromStream(buffer);
129     assert(result.hasCount && result.count == 2);
130     result = inputStream.readFromStream(buffer);
131     assert(result.hasCount && result.count == 2);
132     result = inputStream.readFromStream(buffer);
133     assert(result.hasCount && result.count == 1);
134     result = inputStream.readFromStream(buffer);
135     assert(result.hasCount && result.count == 0);
136 }
137 
138 /** 
139  * A struct that, when initialized with an output stream, acts as a Phobos-
140  * style output range for elements of the same type.
141  */
142 struct OutputStreamRange(S, E = StreamType!S) if (isOutputStream!(S, E)) {
143     private S stream;
144     
145     /**
146      * Writes all elements in `buffer` to the underlying stream.
147      * Params:
148      *   buffer = The buffer of elements to write.
149      */
150     void put(E[] buffer) {
151         this.stream.writeToStream(buffer);
152     }
153 }
154 
155 /**
156  * Wraps an existing output stream as a Phobos-style output range with a
157  * `put` method, to make any output stream compatible with functions that take
158  * output ranges.
159  * Params:
160  *   stream = The stream to wrap.
161  * Returns: The output range.
162  */
163 auto asOutputRange(S, E = StreamType!S)(S stream) if (isOutputStream!(S, E)) {
164     return OutputStreamRange!(S, E)(stream);
165 }
166 
167 unittest {
168     import streams : arrayOutputStreamFor;
169     auto s = arrayOutputStreamFor!ubyte;
170     auto o = asOutputRange(&s);
171     assert(isOutputRange!(typeof(o), ubyte));
172     assert(s.toArrayRaw() == []);
173     ubyte[3] buf = [1, 2, 3];
174     o.put(buf);
175     assert(s.toArrayRaw() == [1, 2, 3]);
176 }
177 
178 /** 
179  * An output stream implementation that wraps a Phobos-style output range.
180  */
181 struct OutputRangeStream(R, E = ElementType!R) if (isOutputRange!(R, E)) {
182     private R range;
183 
184     /**
185      * Writes the elements in `buffer` to the underlying output range.
186      * Params:
187      *   buffer = The buffer of elements to write.
188      * Returns: Always the number of elements in `buffer`. If the underlying
189      * range throws an exception, this will be thrown by this stream.
190      */
191     StreamResult writeToStream(E[] buffer) {
192         this.range.put(buffer);
193         return StreamResult(cast(uint) buffer.length);
194     }
195 }
196 
197 /** 
198  * Wraps a Phobos-style output range as an output stream.
199  * Params:
200  *   range = The output range to wrap.
201  * Returns: An output stream that writes data to the underlying output range.
202  */
203 auto asOutputStream(R, E = ElementType!R)(R range) if (isOutputRange!(R, E)) {
204     return OutputRangeStream!(R, E)(range);
205 }
206 
207 unittest {
208     ubyte[8192] r;
209     auto s = asOutputStream(r[]);
210     ubyte[3] buf = [1, 2, 3];
211     assert(s.writeToStream(buf) == StreamResult(3));
212     assert(r[0 .. 3] == [1, 2, 3]);
213     buf[0] = 4;
214     buf[1] = 5;
215     assert(s.writeToStream(buf[0 .. 2]) == StreamResult(2));
216     assert(r[0 .. 5] == [1, 2, 3, 4, 5]);
217 }
218 
219 /** 
220  * Converts the given range to a stream. Input ranges are converted to input
221  * streams, and output ranges are converted to output streams. Note that if
222  * the given range is both an input and an output range, an input stream is
223  * returned. Use `asInputStream` and `asOutputStream` to choose explicitly.
224  * Params:
225  *   range = The range to convert.
226  * Returns: A stream that wraps the given range.
227  */
228 auto asStream(R, E = ElementType!R)(R range) if (isInputRange!R || isOutputRange!(R, E)) {
229     static if (isInputRange!R) {
230         return asInputStream!(R, E)(range);
231     } else {
232         return asOutputStream!(R, E)(range);
233     }
234 }
235 
236 unittest {
237     import streams.types.array : byteArrayOutputStream;
238 
239     int[10] inputRange;
240     auto inputStream = asStream(inputRange[]);
241     assert(isInputStream!(typeof(inputStream), int));
242     // First check that something which is both an input and output range defaults to input stream.
243     int[10] outputRange;
244     auto outputStream = asStream(outputRange[]);
245     assert(isInputStream!(typeof(outputStream), int));
246     // Now check a "pure" output range.
247     
248     auto sOut = byteArrayOutputStream();
249     auto pureOutputRange = asOutputRange(sOut);
250     assert(isOutputRange!(typeof(pureOutputRange), ubyte) && !isInputRange!(typeof(pureOutputRange)));
251     // We do need to use explicit template arguments here.
252     auto pureOutputStream = asStream!(typeof(pureOutputRange), ubyte)(pureOutputRange);
253     assert(isOutputStream!(typeof(pureOutputStream), ubyte));
254 }
255 
256 /** 
257  * Converts the given stream to a range. Input streams are converted to input
258  * ranges, and output streams are converted to output ranges.
259  * Params:
260  *   stream = The stream to convert.
261  * Returns: A range that wraps the given stream.
262  */
263 auto asRange(S, E = StreamType!S)(ref S stream) if (isSomeStream!S) {
264     static if (isSomeInputStream!S) {
265         return asInputRange!(S, E)(stream);
266     } else {
267         return asOutputRange!(S, E)(stream);
268     }
269 }
270 
271 unittest {
272     import streams.types.array : arrayInputStreamFor, byteArrayOutputStream;
273 
274     ubyte[3] buf1 = [1, 2, 3];
275     auto inputStream = arrayInputStreamFor(buf1);
276     auto inputRange = asRange(inputStream);
277     assert(isInputRange!(typeof(inputRange)));
278     assert(is(ElementType!(typeof(inputRange)) == ubyte));
279 
280     auto outputStream = byteArrayOutputStream();
281     auto outputRange = asRange(outputStream);
282     assert(isOutputRange!(typeof(outputRange), ubyte));
283 }