本文節選自 Node.js CheatSheet | Node.js 文法基礎、架構使用與實踐技巧 ,也可以閱讀 JavaScript CheatSheet 或者 現代 Web 開發基礎與工程實踐 了解更多 JavaScript/Node.js 的實際應用。
Stream 是
Node.js 中的基礎概念,類似于 EventEmitter,專注于 IO 管道中事件驅動的資料處理方式;類比于數組或者映射,Stream 也是資料的集合,隻不過其代表了不一定正在記憶體中的資料。。Node.js 的 Stream 分為以下類型:
- Readable Stream: 可讀流,資料的産生者,譬如 process.stdin
- Writable Stream: 可寫流,資料的消費者,譬如 process.stdout 或者 process.stderr
- Duplex Stream: 雙向流,即可讀也可寫
- Transform Stream: 轉化流,資料的轉化者
Stream 本身提供了一套接口規範,很多 Node.js 中的内模組化塊都遵循了該規範,譬如著名的
fs
子產品,即是使用 Stream 接口來進行檔案讀寫;同樣的,每個 HTTP 請求是可讀流,而 HTTP 響應則是可寫流。
Readable Stream
const stream = require('stream');
const fs = require('fs');
const readableStream = fs.createReadStream(process.argv[2], {
encoding: 'utf8'
});
// 手動設定流資料編碼
// readableStream.setEncoding('utf8');
let wordCount = 0;
readableStream.on('data', function(data) {
wordCount += data.split(/\s{1,}/).length;
});
readableStream.on('end', function() {
// Don't count the end of the file.
console.log('%d %s', --wordCount, process.argv[2]);
});
當我們建立某個可讀流時,其還并未開始進行資料流動;添加了 data 的事件監聽器,它才會變成流動态的。在這之後,它就會讀取一小塊資料,然後傳到我們的回調函數裡面。
data
事件的觸發頻次同樣是由實作者決定,譬如在進行檔案讀取時,可能每行都會觸發一次;而在 HTTP 請求處理時,可能數 KB 的資料才會觸發一次。可以參考
nodejs/readable-stream/_stream_readable中的相關實作,發現 on 函數會觸發 resume 方法,該方法又會調用 flow 函數進行流讀取:
// function on
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false) this.resume();
}
...
// function flow
while (state.flowing && stream.read() !== null) {}
我們還可以監聽
readable
事件,然後手動地進行資料讀取:
let data = '';
let chunk;
readableStream.on('readable', function() {
while ((chunk = readableStream.read()) != null) {
data += chunk;
}
});
readableStream.on('end', function() {
console.log(data);
});
Readable Stream 還包括如下常用的方法:
- Readable.pause(): 這個方法會暫停流的流動。換句話說就是它不會再觸發 data 事件。
- Readable.resume(): 這個方法和上面的相反,會讓暫停流恢複流動。
- Readable.unpipe(): 這個方法會把目的地移除。如果有參數傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。
在日常開發中,我們可以用
stream-wormhole來模拟消耗可讀流:
sendToWormhole(readStream, true);
Writable Stream
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
});
writableStream.end();
當
end()
被調用時,所有資料會被寫入,然後流會觸發一個
finish
事件。注意在調用
end()
之後,你就不能再往可寫流中寫入資料了。
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
Writable Stream 中同樣包含一些與 Readable Stream 相關的重要事件:
- error: 在寫入或連結發生錯誤時觸發
- pipe: 當可讀流連結到可寫流時,這個事件會觸發
- unpipe: 在可讀流調用 unpipe 時會觸發
Pipe | 管道
const fs = require('fs');
const inputFile = fs.createReadStream('REALLY_BIG_FILE.x');
const outputFile = fs.createWriteStream('REALLY_BIG_FILE_DEST.x');
// 當建立管道時,才發生了流的流動
inputFile.pipe(outputFile);
多個管道順序調用,即是建構了連結(Chaining):
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('output.txt'));
管道也常用于 Web 伺服器中的檔案處理,以 Egg.js 中的應用為例,我們可以從 Context 中擷取到檔案流并将其傳入到可寫檔案流中:
完整代碼參考 Backend Boilerplate/egg
const awaitWriteStream = require('await-stream-ready').write;
const sendToWormhole = require('stream-wormhole');
...
const stream = await ctx.getFileStream();
const filename =
md5(stream.filename) + path.extname(stream.filename).toLocaleLowerCase();
//檔案生成絕對路徑
const target = path.join(this.config.baseDir, 'app/public/uploads', filename);
//生成一個檔案寫入檔案流
const writeStream = fs.createWriteStream(target);
try {
//異步把檔案流寫入
await awaitWriteStream(stream.pipe(writeStream));
} catch (err) {
//如果出現錯誤,關閉管道
await sendToWormhole(stream);
throw err;
}
...
參照
分布式系統導論,可知在典型的流處理場景中,我們不可以避免地要處理所謂的背壓(Backpressure)問題。無論是 Writable Stream 還是 Readable Stream,實際上都是将資料存儲在内部的 Buffer 中,可以通過
writable.writableBuffer
readable.readableBuffer
來讀取。當要處理的資料存儲超過了
highWaterMark
或者目前寫入流處于繁忙狀态時,write 函數都會傳回
false
。
pipe
函數即會自動地幫我們啟用背壓機制:
當 Node.js 的流機制監測到 write 函數傳回了
false
,背壓系統會自動介入;其會暫停目前 Readable Stream 的資料傳遞操作,直到消費者準備完畢。
+===============+
| Your_Data |
+=======+=======+
|
+-------v-----------+ +-------------------+ +=================+
| Readable Stream | | Writable Stream +---------> .write(chunk) |
+-------+-----------+ +---------^---------+ +=======+=========+
| | |
| +======================+ | +------------------v---------+
+-----> .pipe(destination) >---+ | Is this chunk too big? |
+==^=======^========^==+ | Is the queue busy? |
^ ^ ^ +----------+-------------+---+
| | | | |
| | | > if (!chunk) | |
^ | | emit .end(); | |
^ ^ | > else | |
| ^ | emit .write(); +---v---+ +---v---+
| | ^----^-----------------< No | | Yes |
^ | +-------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---^---------------------+ return false; <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^---^-----------------^---< Buffering | |
| |============| |
+> emit .drain(); | <Buffer> | |
+> emit .resume(); +------------+ |
| <Buffer> | |
+------------+ add chunk to queue |
| <--^-------------------<
+============+
Duplex Stream
Duplex Stream 可以看做讀寫流的聚合體,其包含了互相獨立、擁有獨立内部緩存的兩個讀寫流, 讀取與寫入操作也可以異步進行:
Duplex Stream
------------------|
Read <----- External Source
You ------------------|
Write -----> External Sink
------------------|
我們可以使用 Duplex 模拟簡單的套接字操作:
const { Duplex } = require('stream');
class Duplexer extends Duplex {
constructor(props) {
super(props);
this.data = [];
}
_read(size) {
const chunk = this.data.shift();
if (chunk == 'stop') {
this.push(null);
} else {
if (chunk) {
this.push(chunk);
}
}
}
_write(chunk, encoding, cb) {
this.data.push(chunk);
cb();
}
}
const d = new Duplexer({ allowHalfOpen: true });
d.on('data', function(chunk) {
console.log('read: ', chunk.toString());
});
d.on('readable', function() {
console.log('readable');
});
d.on('end', function() {
console.log('Message Complete');
});
d.write('....');
在開發中我們也經常需要直接将某個可讀流輸出到可寫流中,此時也可以在其中引入 PassThrough,以友善進行額外地監聽:
const { PassThrough } = require('stream');
const fs = require('fs');
const duplexStream = new PassThrough();
// can be piped from reaable stream
fs.createReadStream('tmp.md').pipe(duplexStream);
// can pipe to writable stream
duplexStream.pipe(process.stdout);
// 監聽資料,這裡直接輸出的是 Buffer<Buffer 60 60 ... >
duplexStream.on('data', console.log);
Transform Stream
Transform Stream 則是實作了
_transform
方法的 Duplex Stream,其在兼具讀寫功能的同時,還可以對流進行轉換:
Transform Stream
--------------|--------------
You Write ----> ----> Read You
--------------|--------------
這裡我們實作簡單的 Base64 編碼器:
const util = require('util');
const Transform = require('stream').Transform;
function Base64Encoder(options) {
Transform.call(this, options);
}
util.inherits(Base64Encoder, Transform);
Base64Encoder.prototype._transform = function(data, encoding, callback) {
callback(null, data.toString('base64'));
};
process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);
原文位址:
https://segmentfault.com/a/1190000016328755