1 module buffer.rpc.client; 2 3 import core.stdc.errno; 4 import std.meta : staticIndexOf; 5 import std.traits; 6 import std.conv : to; 7 import std.variant; 8 import std.socket; 9 import std.bitmanip; 10 import std.exception; 11 import std.typecons; 12 import std.datetime; 13 14 import crypto.rsa; 15 16 import buffer.message; 17 18 // static if (!__traits(compiles, EWOULDBLOCK)) 19 // { 20 // enum EWOULDBLOCK = EAGAIN; 21 // } 22 23 /// Rpc client 24 class Client 25 { 26 private __gshared string host; 27 private __gshared ushort port; 28 29 static void setServerHost(const string host, const ushort port) 30 { 31 Client.host = host; 32 Client.port = port; 33 } 34 35 /++ 36 Use global magic, host, port and other information. need call settings(), setServerHost() at before call this. 37 When clients do not need to connect to different servers, using it can simplify calls. 38 +/ 39 static T call(T, Params...)(const string method, Params params) 40 if (is(T == void) || (staticIndexOf!(T, supportedBuiltinTypes) != -1) || ((BaseTypeTuple!T.length > 0) && is(BaseTypeTuple!T[0] == Message))) 41 { 42 enforce(host != string.init, "Server host and port must be set."); 43 44 static if (is(T == void)) 45 { 46 callEx!(T, Params)(host, port, Message._magic, Message._crypt, Message._key, Message._rsaKey, method, params); 47 } 48 else 49 { 50 return callEx!(T, Params)(host, port, Message._magic, Message._crypt, Message._key, Message._rsaKey, method, params); 51 } 52 } 53 54 /++ 55 With Server host, port, magic, cryptType, key parameters, not need call setServerHost(), settings(). 56 When the same client needs to connect to different servers, it needs to be used. 57 +/ 58 static T callEx(T, Params...)(const string host, const ushort port, 59 const ushort magic, const CryptType crypt, const string key, Nullable!RSAKeyInfo rsaKey, 60 const string method, Params params) 61 if (is(T == void) || (staticIndexOf!(T, supportedBuiltinTypes) != -1) || ((BaseTypeTuple!T.length > 0) && is(BaseTypeTuple!T[0] == Message))) 62 { 63 enforce(host != string.init, "Server host and port must be set."); 64 enforce(method.length > 0, "Paramter method must be set."); 65 66 TcpSocket socket = new TcpSocket(); 67 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 30.seconds); 68 socket.setOption(SocketOptionLevel.SOCKET, SocketOption.SNDTIMEO, 30.seconds); 69 socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true); 70 socket.connect(new InternetAddress(host, port)); 71 72 static if (is(T == void)) 73 { 74 callEx!(T, Params)(socket, magic, crypt, key, rsaKey, method, params); 75 } 76 else 77 { 78 T result = callEx!(T, Params)(socket, magic, crypt, key, rsaKey, method, params); 79 } 80 81 socket.shutdown(SocketShutdown.BOTH); 82 socket.close(); 83 84 static if (!is(T == void)) 85 { 86 return result; 87 } 88 } 89 90 /++ 91 With Socket socket, magic, cryptType, key parameters, not need call setServerHost(), settings(). 92 When the same client needs to connect to different servers, it needs to be used. 93 At the same time, if long connection mode is adopted, it needs to be used as well. 94 +/ 95 static T callEx(T, Params...)(Socket socket, 96 const ushort magic, const CryptType crypt, const string key, Nullable!RSAKeyInfo rsaKey, 97 const string method, Params params) 98 if (is(T == void) || (staticIndexOf!(T, supportedBuiltinTypes) != -1) || ((BaseTypeTuple!T.length > 0) && is(BaseTypeTuple!T[0] == Message))) 99 { 100 enforce(socket.isAlive, "The socket is not connected to the server."); 101 enforce(method.length > 0, "Paramter method must be set."); 102 103 static if (is(T == void)) 104 { 105 request(socket, false, Message.serialize_without_msginfo(magic, crypt, key, rsaKey, method, params)); 106 return; 107 } 108 else 109 { 110 ubyte[] response = request(socket, true, Message.serialize_without_msginfo(magic, crypt, key, rsaKey, method, params)); 111 string name; 112 string res_method; 113 Variant[] res_params = Message.deserializeEx(magic, crypt, key, rsaKey, response, name, res_method); 114 115 //enforce(method == res_method); 116 117 static if (isBuiltinType!T) 118 { 119 enforce(res_params.length == 1, "The number of response parameters from the server is incorrect."); 120 121 return res_params[0].get!T; 122 } 123 else 124 { 125 alias FieldTypes = FieldTypeTuple!T; 126 enforce(FieldTypes.length == res_params.length, "Incorrect number of parameters, " ~ T.stringof ~ " requires " ~ FieldTypes.length.to!string ~ " parameters."); 127 128 T message = new T(); 129 130 foreach (i, type; FieldTypes) 131 { 132 mixin(` 133 message.` ~ FieldNameTuple!T[i] ~ ` = res_params[` ~ i.to!string ~ `].get!` ~ type.stringof ~ `; 134 `); 135 } 136 137 return message; 138 } 139 } 140 } 141 142 /++ 143 Note: 144 The caller is responsible for connection and closure without actively closing the socket. 145 But you need to close the exception before throwing it to release server resources. 146 +/ 147 private static ubyte[] request(Socket socket, const bool requireReceive, const ubyte[] data) 148 { 149 long len; 150 for (size_t off; off < data.length; off += len) 151 { 152 len = socket.send(data[off..$]); 153 154 if (len > 0) 155 { 156 continue; 157 } 158 else if (len == 0) 159 { 160 socket.shutdown(SocketShutdown.BOTH); 161 socket.close(); 162 163 throw new Exception("Server socket close at sending. error: " ~ formatSocketError(errno)); 164 } 165 else 166 { 167 if (errno == EINTR) // || errno == EAGAIN || errno == EWOULDBLOCK) 168 { 169 len = 0; 170 continue; 171 } 172 173 socket.shutdown(SocketShutdown.BOTH); 174 socket.close(); 175 176 throw new Exception("Server socket error at sending. error: " ~ formatSocketError(errno)); 177 } 178 } 179 180 if (!requireReceive) 181 { 182 return null; 183 } 184 185 ubyte[] receive(long length) 186 { 187 ubyte[] buf = new ubyte[cast(uint)length]; 188 long len; 189 190 for (size_t off; off < buf.length; off += len) 191 { 192 len = socket.receive(buf[off..$]); 193 194 if (len > 0) 195 { 196 continue; 197 } 198 else if (len == 0) 199 { 200 return null; 201 } 202 else 203 { 204 if (errno == EINTR) // || errno == EAGAIN || errno == EWOULDBLOCK) 205 { 206 len = 0; 207 continue; 208 } 209 210 return null; 211 } 212 } 213 214 return buf; 215 } 216 217 len = cast(long)(ushort.sizeof + int.sizeof); 218 ubyte[] buffer = receive(len); 219 220 if (buffer.length != len) 221 { 222 socket.shutdown(SocketShutdown.BOTH); 223 socket.close(); 224 225 throw new Exception("Server socket error at receiving. error: " ~ formatSocketError(errno)); 226 } 227 228 len = buffer.peek!int(2); 229 ubyte[] buf = receive(len); 230 231 if (buf.length != len) 232 { 233 socket.shutdown(SocketShutdown.BOTH); 234 socket.close(); 235 236 throw new Exception("Server socket error at receiving. error: " ~ formatSocketError(errno)); 237 } 238 239 return buffer ~ buf; 240 } 241 }