-
Notifications
You must be signed in to change notification settings - Fork 1
/
client-channel.js
289 lines (244 loc) · 11.3 KB
/
client-channel.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
const net = require('net');
const path = require('path');
const jsonFile = require('jsonfile');
const Utils = require('./utils');
/**
* Validation miners use the client channel to connect to one or more
* Storage miners. Validation miners also implement the server channel
* to gossip among themselves on the overall health of the STORE network.
* This module implements the client channel.
*/
module.exports = class ClientChannel {
/**
* @param options - options to initialize the client channel.
* For example: {
* clientPrivateKey: 'Private key of this client',
* servers: {
* [
* 's1', 's2', ...
* // One or more servers to connect to. These server "ids" from
* // config/miner-public-keys.json where each id is the public key
* // of the server.
* ]
* }
* }
*/
constructor(options) {
if (!options || Object.keys(options).length === 0) {
throw new Error('options parameter is required to create the client channel.')
}
if (!options.clientPrivateKey) {
throw new Error('options parameter should contain the private key for this client.')
}
if (!options.servers || !Array.isArray(options.servers) || options.servers.length === 0) {
throw new Error('options parameter should list one or more servers to connect this client channel to.')
}
const self = this;
// Make a copy of options to keep the originaldata unmutated.
this.options = Utils.clone(options);
this.options.servers = this.options.servers.map(server => {
// Create servers as an array of objects, so we can add additional attributes later on.
return {
identifier: server
}
});
this.supportedNoiseEngines = ['noise-stream', 'noise-peer'];
// During development and testing, we will support specifying different noise engines.
if (!this.options.noiseEngine || this.supportedNoiseEngines.indexOf(this.options.noiseEngine) === -1) {
this.options.noiseEngine = this.supportedNoiseEngines[0];
}
// Get the public information about miners. This client will connect to one or more of other miners.
this.minersPublicInfo = jsonFile.readFileSync(path.join(__dirname,'./config/miner-public-keys.json'));
// Now validate that the options.servers has the right servers requested.
this.options.servers.forEach(server => {
const serverInfo = self.minersPublicInfo[server.identifier];
if (!serverInfo) {
throw new Error('Server ' + server.identifier + ' is not one of the available peers to connect to.');
}
// Append all properties to the server object.
server = Object.assign(server, serverInfo);
})
this.options.retryAttempts = 3; // Number of retry attempts to reestablish connections.
this.options.retryTimeoutInMS = 1000;
this.ready = false; // The client channel will be ready when it connects to all servers.
}
/**
* Initialize the client channel. Connect to all the servers specified.
* For each server, a "socket" is returned, which is used to send encrypted
* requests and receive encrypted responses from the server.
* @param readCallback - A callback function to receive data from connected servers.
* @return - Number of successful connections. This should be same as options.servers.length
* if all connections are successful. Otherwise it will be less than the number of
* servers requested.
*/
async initialize(readCallback) {
const self = this;
/**
* Helper function that connects to all servers requested. For each
* server a Promise with {socket: <socket>, server: <server info>}
* is returned. socket can be null, if connecting to the server fails.
*/
const connectAll = async () => {
return await Promise.all(self.options.servers.map(server => self.connect(server)));
};
try {
const results = await connectAll();
let successfulConnections = 0;
results.forEach((result, index) => {
let matchingServer = self.options.servers[index];
if (result.socket !== null) {
// We are not using results.filter() to retain the array indices
// to map to options.servers.
matchingServer.socket = result.socket;
// Create Noise channel on the socket for encrypted message exchanges.
self.createNoiseChannel(matchingServer, readCallback);
successfulConnections += 1;
} else {
matchingServer.socket = null;
}
})
// Client is ready as long as a single server is connected to.
this.ready = successfulConnections > 0;
return successfulConnections;
} catch(e) {
// Any exception is severe and affects the client functionality.
throw (e);
}
}
/**
* Connects to a server with the specified address and port.
* @param serverInfo - containing serverAddress and port as:
* {
* serverAddress: '<server IP address>',
* serverPort: <server port number>
* }
* serverInfo may contain other details, but they are irrelevant for this API.
* @return - A Promise with the connected socket if successful.
*/
connect(serverInfo) {
return new Promise((resolve, reject) => {
const socket = net.createConnection(serverInfo.serverPort, serverInfo.serverAddress, () => {
resolve({socket: socket, server: serverInfo});
}).on('error', (err) => {
// It is possible that one or more servers are unreachable.
resolve({socket: null, server: serverInfo});
});
});
}
/**
* Returns the noise engine depending on the engine requested. This implementation
* prevents creating unnecessary engines at runtime while being a little naive about the logic
* to determine what engine to create.
* @param serverInfo - The object containing server info and the socket already established.
* @return the requested noise engine.
*/
noiseEngine(serverInfo) {
const supportedEngines = this.supportedNoiseEngines;
const serverIdentifier = serverInfo.identifier;
let noiseEngine;
switch(this.options.noiseEngine) {
case supportedEngines[0]:
noiseEngine = new (require('./noise-stream-engine'))(serverInfo.socket, serverIdentifier);
break;
case supportedEngines[1]:
noiseEngine = new (require('./noise-peer-engine'))(serverInfo.socket, serverIdentifier);
break;
default:
noiseEngine = new (require('./noise-stream-engine'))(serverInfo.socket, serverIdentifier);
break;
}
return noiseEngine;
}
/**
* Creates a Noise channel on the socket for encrypted message exchanges between this
* client and connected servers.
* @param serverInfo - The object containing server info and the socket already established.
* @param readCallback - A callback function to receive data from connected server.
*/
createNoiseChannel(serverInfo, readCallback) {
const self = this;
serverInfo.noiseClient = this.noiseEngine(serverInfo);
serverInfo.noiseClient.createInitiatorChannel(readCallback);
// It is possible that the server gets disconnected. So, add reconnect
// logic to the socket.
serverInfo.socket.on('close', () => {
self.reconnect(serverInfo, readCallback);
});
}
/**
* Reconnects to a disconnected server. It is possible that the server is permanently down,
* so the reconnect may fail.
* @param serverInfo - The object containing server info to connect to.
* @param readCallback - A callback function to receive data from connected server.
*/
reconnect(serverInfo, readCallback) {
const self = this;
serverInfo.retryAttempts = 0;
// Connect to the requested server.
const connect = async () => {
return await self.connect(serverInfo);
};
// Helper to retry connecting to the specified server.
const retry = () => {
if (++serverInfo.retryAttempts < self.options.retryAttempts) {
Utils.interval(reestablish, (this.options.retryTimeoutInMS * serverInfo.retryAttempts * 2), 1);
}
}
// Reestablish the connection to the lost server.
const reestablish = async () => {
// Remove all existing listeners from the current socket.
if (serverInfo.socket !== null) {
serverInfo.socket.removeAllListeners();
serverInfo.socket = null;
serverInfo.noiseClient = null;
}
try {
const result = await connect();
if (result.socket !== null) {
serverInfo.socket = result.socket;
// Create Noise channel on the socket for encrypted message exchanges.
self.createNoiseChannel(serverInfo, readCallback);
} else {
// Connecting to server failed. Retry if we can.
retry();
}
} catch(e) {
// An exception here shouldn't affect client functionality because this is
// retry, so we will attempt retrying again.
retry();
}
}
Utils.interval(reestablish, this.options.retryTimeoutInMS, 1);
}
/**
* Sends the data to all connected servers.
* @param jsonData - Data to send in JSON format.
* This method is asynchronous, so the responses, if any, will be available in
* the read callback supplied in the inititalization call.
* The data is encrypted end to end.
*/
send(jsonData) {
this.options.servers.forEach(server => {
if (server.socket !== null && server.noiseClient !== null) {
// The channel may be temporarily set to null, if the client loses connection
// to this server or initial connection attempts failed.
server.noiseClient.send(jsonData);
}
});
}
/*
* Closes this client channel. Severes all connections to connected servers.
*/
close() {
this.options.servers.forEach(server => {
if (server.socket !== null) {
server.noiseClient.close();
server.noiseClient = null;
server.socket.end();
server.socket.removeAllListeners();
server.socket.destroy();
server.socket = null;
}
});
}
}