1 /** 
2  * Defines input and output streams for reading from and writing to sockets,
3  * using the `Socket` class from `std.socket` as the underlying resource.
4  */
5 module streams.types.socket;
6 
7 import streams.primitives;
8 
9 version (D_BetterC) {} else {
10 
11 import std.socket;
12 
13 /** 
14  * A byte input stream for reading from a socket.
15  */
16 struct SocketInputStream {
17     private Socket socket;
18 
19     /** 
20      * Receives up to `buffer.length` bytes from the socket, and stores them in
21      * `buffer`.
22      * Params:
23      *   buffer = The buffer to store received bytes in.
24      * Returns: The number of bytes read, or -1 in case of error.
25      */
26     StreamResult readFromStream(ubyte[] buffer) {
27         ptrdiff_t receiveCount = this.socket.receive(buffer);
28         if (receiveCount == Socket.ERROR) {
29             return StreamResult(StreamError("Socket error: " ~ lastSocketError(), cast(int) receiveCount));
30         }
31         return StreamResult(cast(uint) receiveCount);
32     }
33 
34     /** 
35      * Shuts down and closes this stream's underlying socket.
36      */
37     OptionalStreamError closeStream() {
38         this.socket.shutdown(SocketShutdown.BOTH);
39         this.socket.close();
40         return OptionalStreamError.init;
41     }
42 }
43 
44 /** 
45  * A byte output stream for writing to a socket.
46  */
47 struct SocketOutputStream {
48     private Socket socket;
49 
50     /** 
51      * Writes bytes from `buffer` to the socket.
52      * Params:
53      *   buffer = The buffer to write bytes from.
54      * Returns: The number of bytes written, or -1 in case of error.
55      */
56     StreamResult writeToStream(ubyte[] buffer) {
57         ptrdiff_t sendCount = this.socket.send(buffer);
58         if (sendCount == Socket.ERROR) {
59             return StreamResult(StreamError("Socket error: " ~ lastSocketError(), cast(int) sendCount));
60         }
61         return StreamResult(cast(uint) sendCount);
62     }
63 
64     /** 
65      * Shuts down and closes this stream's underlying socket.
66      */
67     OptionalStreamError closeStream() {
68         this.socket.shutdown(SocketShutdown.BOTH);
69         this.socket.close();
70         return OptionalStreamError.init;
71     }
72 }
73 
74 unittest {
75     import streams.primitives;
76 
77     assert(isByteInputStream!SocketInputStream);
78     assert(isClosableStream!SocketInputStream);
79 
80     assert(isByteOutputStream!SocketOutputStream);
81     assert(isClosableStream!SocketOutputStream);
82 
83     Socket[2] pair = socketPair();
84     auto sIn = SocketInputStream(pair[0]);
85     auto sOut = SocketOutputStream(pair[1]);
86     assert(sOut.writeToStream([1, 2, 3]) == StreamResult(3));
87     ubyte[] buffer = new ubyte[8192];
88     assert(sIn.readFromStream(buffer) == StreamResult(3));
89     assert(buffer[0 .. 3] == [1, 2, 3]);
90     sIn.closeStream();
91     sOut.closeStream();
92     assert(sIn.readFromStream(buffer).hasError);
93     assert(sOut.writeToStream([4, 5, 6]).hasError);
94 }
95 
96 }