在 Node.js 的微服务中,一般不同的服务模块我们会采用 TCP 进行通信,本文来简单谈一谈如何设计 TCP 服务的基础管理。
在具体设计上,本文参考了微服务框架 Seneca 所采用的通信方案 Seneca-transport,已经被实践所证明其可行性。
一提到 TCP 通信,我们肯定离不开 net
模块,事实上,借助 net
模块,我们也可以比较快速地完成一般的 TCP 通信的任务。
为了避免对基础的遗忘,我们还是先附上一个基本的 TCP 链接代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| const net = require('net');
const server = net.createServer((socket) => { socket.write('goodbye\n'); socket.on('data', (data) => { console.log('data:', data.toString()); socket.write('goodbye\n'); }) }).on('error', (err) => { throw err; });
server.listen(8024, () => { console.log('opened server on', server.address()); });
const net = require('net');
const client = net.createConnection({ port: 8024 }, () => { console.log('connected to server!'); client.write('world!\r\n'); setInterval(() => { client.write('world!\r\n'); }, 1000) }); client.on('data', (data) => { console.log(data.toString()); }); client.on('end', () => { console.log('disconnected from server'); });
|
其实,上述已经是一个几乎最简单的客户端和服务端通信 Demo,但是并不能在实际项目中使用,首先我们需要审视,其离生产环境还差哪些内容:
- 以上要求 Server 端要在 Client 端之前启动,并且一旦因为一些错误导致 Server 端重启了并且这个时候 Client 端正好和 Server 端进行通信,那么肯定会 crash,所以,我们需要一个更为平滑兼容的方案。
- 以上 TCP 链接的 Server 部分,并没有对 connection 进行管理的能力,并且在在以上的例子中,双方都没有主动释放链接,也就是说,建立的是一个 TCP 长连接。
- 以上链接的处理数据能力有限,只能处理纯文本的内容,并且还有一定的风险性(你也许会说可以用 JSON 的序列化反序列化的方法来处理 JSON 数据,但是你别忘了
socket.on('data'...
很可能接收到的不是一个完整的 JSON,如果 JSON 较长,其可能只接收到一般的内容,这个时候如果直接 JSON.parse())
很可能就会报错)。
以上三个问题,便是我们要解决的主要问题,如果你看过之后立刻知道该如何解决了,那么这篇文章可能你不需要看了,否则,我们可以一起继续探索解决方案。
使用 reconnect-core
reconnect-core 是一个和协议无关的链接重试算法,其工作方式也比较简单,当你需要在 Client 端建立链接的时候,其流程是这样的:
- 调用事先传入的链接建立函数,如果这个时候返回成功了,即成功建立链接。
- 如果第一次建立链接失败了,那么再隔一段时间建立第二次,如果第二次还是失败,那么再隔一段更长的时间建立第三次,如果还是失败,那么再隔更长的一段时间……直到到达最大的尝试次数。
实际上关于尝试的时间间隔,也会有不同的策略,比较常用的是 Fibonacci 策略和 exponential 策略。
当然,关于策略的具体实现,reconnect-core 采用了一个 backoff 的库来管理,其可以支持 Fibonacci 策略和 exponential 策略以及更多的自定义策略。
对于上面提到的 DEMO 代码。我们给出 Client 端使用 reconnect-core 的一个实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| const Reconnect = require('reconnect-core'); const net = require('net'); const Ndjson = require('ndjson');
const Connect = Reconnect(function() { var args = [].slice.call(arguments); return net.connect.apply(null, args) });
let connection = Connect(function(socket) { socket.write('world!\r\n'); socket.on('data', (msg) => { console.log('data', msg.toString()); }); socket.on('close', (msg) => { console.log('close', msg).toString(); connection.disconnect(); }); socket.on('end', () => { console.log('end'); }); });
connection.connect({ port: 8024 }); connection.on('reconnect', function () { console.log('on reconnect...') }); connection.on('error', function (err) { console.log('error:', err); }); connection.on('disconnect', function (err) { console.log('disconnect:', err); });
|
采用 Reconnect 实际上相比之前是多了一层内容,我们在这里需要区分 connection 实例和 socket 句柄,并且附加正确的时间监听。
现在,我们就不用担心到底是先启动服务端还是先启动客户端了,另外,就算我们的服务端在启动之后由于某些错误关闭了一会,只要没超过最大时间(而这个也是可配置的),仍然不用担心客户端与其建立连接。
给 Server 端增加管理能力
给 Server 端增加管理能力是一个比较必要的并且可以做成不同程度的,一般来说,最重要的功能则是及时清理链接,常用的做法是收到某条指令之后进行清理,或者到达一定时间之后定时清理。
这里我们可以增加一个功能,达到一定时间之后,自动清理所有链接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| const net = require('net');
var connections = [];
const server = net.createServer((socket) => { connections.push(socket); socket.write('goodbye\n'); socket.on('data', (data) => { console.log('data:', data.toString()); socket.write('goodbye\n'); }) }).on('error', (err) => { throw err; });
setTimeout(() => { console.log('clear connections'); connections.forEach((connection) => { connection.end('end') }) }, 10000);
server.listen(8024, () => { console.log('opened server on', server.address()); });
|
我们可以通过connection.end('end')
和 connection.destory()
来清理,一般来说,前者是正常情况下的关闭指令,需要 Client 端进行确认,而后者则是强制关闭,一般在出错的时候会这样调用。
使用 ndjson 来格式化数据
ndjson 是一个比较方便的 JSON 序列化/反序列化库,相比于我们直接用 JSON,其好处主要体现在:
- 可以同时解析多个 JSON 对象,如果是一个文件流,即其可以包含多个
{}
,但是要求则是每一个占据一行,其按行分割并且解析。
- 内部使用了 split2,好处就是其返回时可以保证该行的所有内容已经接受完毕,从而防止 ndjson 在序列化的时候出错。
关于 ndjson 的基本使用,可以根据上述链接查找文档,这里一般情况下,我们的使用方式如下(以下是一个 demo):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| const net = require('net');
var connections = [];
const server = net.createServer((socket) => { connections.push(socket); socket.on('data', (data) => { console.log('data:', data.toString()); socket.write('{"good": 1234}\r\n'); socket.write('{"good": 4567}\n\n'); }) }).on('error', (err) => { throw err; });
server.listen(8024, () => { console.log('opened server on', server.address()); });
const Reconnect = require('reconnect-core'); const net = require('net'); const Ndjson = require('ndjson'); var Stream = require('stream');
const Connect = Reconnect(function() { var args = [].slice.call(arguments); return net.connect.apply(null, args) });
let connection = Connect(function(socket) { socket.write('world!\r\n'); var parser = Ndjson.parse(); var stringifier = Ndjson.stringify();
function yourhandler(){ var messager = new Stream.Duplex({ objectMode: true }); messager._read = function () { }; messager._write = function (data, enc, callback) { console.log(typeof data, data); return callback() }; return messager } socket .pipe(parser) .pipe(yourhandler()) .pipe(stringifier) .pipe(socket);
socket.on('close', (msg) => { console.log('close', msg).toString(); connection.disconnect(); }); socket.on('end', (msg) => { console.log('end', msg); }); }); connection.connect({ port: 8024 }); connection.on('reconnect', function () { console.log('on reconnect...') }); connection.on('error', function (err) { console.log('error:', err); }); connection.on('disconnect', function (err) { console.log('disconnect:', err); });
|
其中,用户具体的逻辑代码,可以是 yourhandler
函数 _write
里面的一部分,其接收的是一个一个处理好的对象。
感谢鼓励