天天看点

探究 Node.js 中的 drain 事件

最近在用 node.js 写一些网络请求相关的代码时,频繁在一些开源代码中看到 drain 事件的使用,于是我也依葫芦画瓢写到了自己的代码里面:

实际放到线上测试的时候发现,在一些情况下,drain 事件真的会被触发,那到底什么时候会触发 drain 事件呢?drain 事件能用来做什么呢?本着打破砂锅问到底的精神,我决定探究一番。

请直接跳到<code>小结</code>部分。

最简单的办法就是查文档。因为我写的是网络请求相关的代码,那么我就先翻越了 net 和 socket 相关的部分。在 node.js 官方文档中,对于 socket.write 有这么一部分描述:

也就是说,drain 事件是和 socket.write 的返回值强关联的,那么我们可以做一个简单的实验(只写关键部分):

可是无论我怎么运行这部分代码,返回值总是 true,drain 事件没有被触发。那为啥线上就能触发呢?按照文档所说,只有全部或者部分数据被缓冲在了内存里面才会返回 false。那问题又来了,什么时候数据才会被缓冲呢?

既然是被缓冲了,那最先猜测到就是数据流量太大。就像每天上下班高峰期的文一西路那样,一旦车流量太大,前面的路口塞满了,交警就会让后面的车停下来。

好,那我们加大“车”流量(为节省篇幅,部分代码省略):

服务端代码:

客户端代码:

但运行多次之后发现仍旧没有看到任何 drain 事件的迹象。难道次数不够?随着我继续增大 i 的最大值,直到遇到<code>(libuv) kqueue(): too many open files in system</code>的错误时候,我仍旧没看到 drain 事件。

逼我用绝招。看 node.js 源代码!

因为 socket.write 实际上是调用的 stream.write(参考此处源代码),最后我们在 stream.write的代码中找到了一丝端倪:

可以看到当要写的数据的长度大于 highwatermark (字面理解:高水位线)的时候,那么 stream.write 就会返回 false,也就会触发 drain 事件了。

那这个高水位线具体是多少呢?可以继续看代码:

默认值是 16kb,看来还是挺大的啊。所以回想一下刚才我们的实验程序,一个是写的数据比较小,另外一个是实验代码中的服务器端没有复杂逻辑,数据处理的也比较快,我们仍旧拿刚才的车流量的例子,虽然车很多很多,但是如果每辆车都开得飞快,那路也不会堵。只有当一些车比较慢,影响到了后面车的速度的时候,整体速度就会下来,就变堵了。

根据这个思路,我们换成下面这个实验:

这里我们启动了一个简单的 http server,任何 requset 过来,都会返回一个 5m 大小的 圣诞歌曲的内容。然后我们对这个 http server 发起 1000 次 get 请求。果然不出所料,还没等所有请求发完,一堆的 drain event fired 日志出现了:

探究 Node.js 中的 drain 事件

如果对 res.write 的返回值做下日志,也会发现返回了很多 false。

原因也很容易想到,硬盘读取这个 mp3 文件的速度(测试环境为 rmbp 的 ssd 硬盘)一般都会快于将数据通过 http response 返回给用户(即便是 localhost 的访问,更不用说外网复杂错综的网络环境了),所以,当 mp3 很快就被读取过来,但又没有很快的将数据写回,那么这个 stream 中的 data 就被缓存了。于是,我很自然而然的设想,在我这个小应用中,也许这么做并没有什么,但当应用的访问量逐渐增大的时候,这个问题就可能会爆发,比如造成个内存泄露啊之类的。

那上面的代码该怎么改进呢?

也就是说,当 write 返回 false 的时候,我们暂停读取流,当缓存的数据清空之后,我们再继续读取流,相当于我们根据输出流来对读取流做限流。反过来,如果写入流快于读取流,我们也可以对写入流限流。

刚才提到了,我想如果我们没做这部分处理的话,应该很容易造成内存泄露,那我们不妨做个实验验证下。使用上面两段代码分别进行不带限流和带限流的实验,每隔 1s 使用 <code>process.memoryusage()</code> 打印出来内存使用(我们在启动 server 之后就开始打印内存数据):

最终我得出了如下数据:

探究 Node.js 中的 drain 事件
探究 Node.js 中的 drain 事件

可以看到,内存使用其实差不多......

难道我猜想错了么?最简单的办法,继续看源代码。

看下 <code>fs.createreadstream</code> 的代码,就可以知道它的工作方式是先在内存中准备一段 buffer,然后在 fs.read() 读取文件到这个 buffer 中,完成一次读取时,则从 buffer 中通过 slice 方法取出那个数据作为一个小 buffer 对象,再通过 data 事件传给调用方[1]。

再看下 pause 具体是怎么实现的:

pause 方法设置了一个状态位。在 <code>flow</code> 方法中,如果是 paused 的状态,那么就不再从 文件流中读取数据了:

所以在 pause 的时候,文件的数据流依旧在 buffer 中。

回头看下我们这个例子,其实是写比较慢,如果我们没有对写入的返回值做判断的话,writable stream 本身也会把多余的数据缓存起来。具体可以看 <code>writeorbuffer</code> 这个方法的实现:

可以看出这里如果之前的写入没有完成,那么会把需要写入的数据缓存起来。

这样就不难理解了,如果我们采用限制读取流的方案,那么数据缓存在读取流的 buffer 里,如果我们采取不限制读取流的方案,那么数据缓存在写入流的 buffer 里,总之这部分数据都是要被缓存,只是缓存到不同的流的 buffer 而已,所以这也能解释了为啥我们的测试结果,内存占用基本没有差别了。

其实,node.js 里面提供了更好的解决方案,也就是 pipe。

我们将上面的代码继续改造下:

是不是更简单了呢?

我们看下官方代码是怎么实现的(缩减版):

是不是和我们自己的实现差不多呢?当然官方代码处理的更加细致,对很多情况做了判断,感兴趣的同学不妨阅读看看:)

看到这里,大家可能会问,那按照你这么说,drain 就完全没用了是么?非也非也。虽然我一开始也是这么想的,但经过我在 github 上一阵狂搜,终于有了一些端倪。

再回想我们刚才的案例,我们的写入流是以 http response 的形式返回给用户,但如果写入流是我们的一个服务呢?比如我们需要往一台 redis server 里面插入大量数据,而这台 redis 又承担对外提供服务的艰巨任务,那么,为了保证我们写入的同时这台 server 依旧能较好的对外服务,很自然的就可以想到我们可以对写入流做限流。比如<code>node_redis</code> 这个模块里面的一个 example:

在这个例子中,当写入流开始缓存的时候,我们就停止写入了,等 buffer flush 完,我们再继续写入。通过这种形式,我们可以控制发送命令的速率。

如果你有更好的案例,欢迎与我一起探讨。

其实在我们平时写代码的过程中,并不需要刻意用到 drain 事件。node.js 本身已经帮我们处理了很多细节。对于流的处理,推荐大家直接用简单方便的 pipe 方法。而drain 事件比较适合用在一些需要自己手工处理限流的场景。另外,在看 node.js 源码的时候很多细节还是很赞的,推荐大家可以阅读下。

朴大师的《深入浅出 node.js》6.4节

继续阅读