1 module buffer.rpc.client;
2 
3 import core.stdc.errno;
4 import std.meta : staticIndexOf;
5 import std.traits;
6 import std.conv : to;
7 import std.variant;
8 import std.socket;
9 import std.bitmanip;
10 import std.exception;
11 import std.typecons;
12 import std.datetime;
13 
14 import crypto.rsa;
15 
16 import buffer.message;
17 
18 // static if (!__traits(compiles, EWOULDBLOCK))
19 // {
20 //     enum EWOULDBLOCK = EAGAIN;
21 // }
22 
23 /// Rpc client
24 class Client
25 {
26     private __gshared string host;
27     private __gshared ushort port;
28 
29     static void setServerHost(const string host, const ushort port)
30     {
31         Client.host = host;
32         Client.port = port;
33     }
34 
35     /++
36         Use global magic, host, port and other information. need call settings(), setServerHost() at before call this.
37         When clients do not need to connect to different servers, using it can simplify calls.
38     +/
39     static T call(T, Params...)(const string method, Params params)
40     if (is(T == void) || (staticIndexOf!(T, supportedBuiltinTypes) != -1) || ((BaseTypeTuple!T.length > 0) && is(BaseTypeTuple!T[0] == Message)))
41     {
42         enforce(host != string.init, "Server host and port must be set.");
43 
44         static if (is(T == void))
45         {
46             callEx!(T, Params)(host, port, Message._magic, Message._crypt, Message._key, Message._rsaKey, method, params);
47         }
48         else
49         {
50             return callEx!(T, Params)(host, port, Message._magic, Message._crypt, Message._key, Message._rsaKey, method, params);
51         }
52     }
53 
54     /++
55         With Server host, port, magic, cryptType, key parameters, not need call setServerHost(), settings().
56         When the same client needs to connect to different servers, it needs to be used.
57     +/
58     static T callEx(T, Params...)(const string host, const ushort port,
59         const ushort magic, const CryptType crypt, const string key, Nullable!RSAKeyInfo rsaKey,
60         const string method, Params params)
61     if (is(T == void) || (staticIndexOf!(T, supportedBuiltinTypes) != -1) || ((BaseTypeTuple!T.length > 0) && is(BaseTypeTuple!T[0] == Message)))
62     {
63         enforce(host != string.init, "Server host and port must be set.");
64         enforce(method.length > 0, "Paramter method must be set.");
65 
66         TcpSocket socket = new TcpSocket();
67         socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 30.seconds);
68         socket.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, 30.seconds);
69         socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
70         socket.connect(new InternetAddress(host, port));
71 
72         static if (is(T == void))
73         {
74             callEx!(T, Params)(socket, magic, crypt, key, rsaKey, method, params);
75         }
76         else
77         {
78             T result = callEx!(T, Params)(socket, magic, crypt, key, rsaKey, method, params);
79         }
80 
81         socket.shutdown(SocketShutdown.BOTH);
82         socket.close();
83 
84         static if (!is(T == void))
85         {
86             return result;
87         }
88     }
89 
90     /++
91         With Socket socket, magic, cryptType, key parameters, not need call setServerHost(), settings().
92         When the same client needs to connect to different servers, it needs to be used.
93         At the same time, if long connection mode is adopted, it needs to be used as well.
94     +/
95     static T callEx(T, Params...)(Socket socket,
96         const ushort magic, const CryptType crypt, const string key, Nullable!RSAKeyInfo rsaKey,
97         const string method, Params params)
98     if (is(T == void) || (staticIndexOf!(T, supportedBuiltinTypes) != -1) || ((BaseTypeTuple!T.length > 0) && is(BaseTypeTuple!T[0] == Message)))
99     {
100         enforce(socket.isAlive, "The socket is not connected to the server.");
101         enforce(method.length > 0, "Paramter method must be set.");
102 
103         static if (is(T == void))
104         {
105             request(socket, false, Message.serialize_without_msginfo(magic, crypt, key, rsaKey, method, params));
106             return;
107         }
108         else
109         {
110             ubyte[] response = request(socket, true, Message.serialize_without_msginfo(magic, crypt, key, rsaKey, method, params));
111             string name;
112             string res_method;
113             Variant[] res_params = Message.deserializeEx(magic, crypt, key, rsaKey, response, name, res_method);
114 
115             //enforce(method == res_method);
116 
117             static if (isBuiltinType!T)
118             {
119                 enforce(res_params.length == 1, "The number of response parameters from the server is incorrect.");
120 
121                 return res_params[0].get!T;
122             }
123             else
124             {
125                 alias FieldTypes = FieldTypeTuple!T;
126                 enforce(FieldTypes.length == res_params.length, "Incorrect number of parameters, " ~ T.stringof ~ " requires " ~ FieldTypes.length.to!string ~ " parameters.");
127 
128                 T message = new T();
129 
130                 foreach (i, type; FieldTypes)
131                 {
132                     mixin(`
133                         message.` ~ FieldNameTuple!T[i] ~ ` = res_params[` ~ i.to!string ~ `].get!` ~ type.stringof ~ `;
134                     `);
135                 }
136 
137                 return message;
138             }
139         }
140     }
141 
142     /++
143         Note:
144         The caller is responsible for connection and closure without actively closing the socket.
145         But you need to close the exception before throwing it to release server resources.
146     +/
147     private static ubyte[] request(Socket socket, const bool requireReceive, const ubyte[] data)
148     {
149         long len;
150         for (size_t off; off < data.length; off += len)
151         {
152             len = socket.send(data[off..$]);
153 
154             if (len > 0)
155             {
156                 continue;
157             }
158             else if (len == 0)
159             {
160                 socket.shutdown(SocketShutdown.BOTH);
161                 socket.close();
162 
163                 throw new Exception("Server socket close at sending. error: " ~ formatSocketError(errno));
164             }
165             else
166             {
167                 if (errno == EINTR) // || errno == EAGAIN || errno == EWOULDBLOCK)
168                 {
169                     len = 0;
170                     continue;
171                 }
172 
173                 socket.shutdown(SocketShutdown.BOTH);
174                 socket.close();
175 
176                 throw new Exception("Server socket error at sending. error: " ~ formatSocketError(errno));
177             }
178         }
179 
180         if (!requireReceive)
181         {
182             return null;
183         }
184 
185         ubyte[] receive(long length)
186         {
187             ubyte[] buf = new ubyte[cast(uint)length];
188             long len;
189 
190             for (size_t off; off < buf.length; off += len)
191             {
192                 len = socket.receive(buf[off..$]);
193 
194                 if (len > 0)
195                 {
196                     continue;
197                 }
198                 else if (len == 0)
199                 {
200                     return null;
201                 }
202                 else
203                 {
204                     if (errno == EINTR) // || errno == EAGAIN || errno == EWOULDBLOCK)
205                     {
206                         len = 0;
207                         continue;
208                     }
209 
210                     return null;
211                 }
212             }
213 
214             return buf;
215         }
216 
217         len = cast(long)(ushort.sizeof + int.sizeof);
218         ubyte[] buffer = receive(len);
219 
220         if (buffer.length != len)
221         {
222             socket.shutdown(SocketShutdown.BOTH);
223             socket.close();
224 
225             throw new Exception("Server socket error at receiving. error: " ~ formatSocketError(errno));
226         }
227 
228         len = buffer.peek!int(2);
229         ubyte[] buf = receive(len);
230 
231         if (buf.length != len)
232         {
233             socket.shutdown(SocketShutdown.BOTH);
234             socket.close();
235 
236             throw new Exception("Server socket error at receiving. error: " ~ formatSocketError(errno));
237         }
238 
239         return buffer ~ buf;
240     }
241 }