本篇文章带大家了解一下Node.js中的之Stream,介绍一下引入 Stream,实现可读流、可写流、双工流和转换流的方法,希望对大家有所帮助!
|
本篇文章带大家了解一下Node中的之Stream,介绍一下引入 Stream,实现可读流、可写流、双工流和转换流的方法,希望对大家有所帮助!
引入 Stream假设我们有这么一个需求,我们需要复制一个文件中的内容到另一个文件中,我们会写出以下代码 const fs = require('fs');
const path = require('path');
const copy = (source, target) => {
fs.readFile(path.resolve(source), (err, data) => {
if(err) {
throw new Error(err.toString());
return;
}
fs.writeFile(path.resolve(target), data, (err) => {
if(!err) {
console.log("复制成功!");
}
})
})
}上面的代码很简单,就是先读取 这样做就有一个缺点,当我们读取大文件时,可能会发生内存不够用的情况,因为它会先将文件的所有内容都读取到内存;另外还就是时间,一次性读取一个大文件到内存,是需要比较长的时间的,用户可能会有卡顿的感觉。 另一种解决办法就是边读边写,读取部分文件内容,然后将内容写入到新文件中,这样在内存中的数据只是部分内容,不会占有太多的内存,由于是边读编写,用户可以很快的得到响应,提高用户体验。 在网上找到一幅动图来形象的展示使用流前后数据的流动情况
Node.js 给我们提供 Stream 的 API,它是专门用来处理大文件的。因为数据是一部分一部分的处理,就像是水流一样,所以这个模块的名称就称为 Stream。 const fs = require('fs');
function copy(source, target) {
const rs = fs.createReadStream(source);
const ws = fs.createWriteStream(target);
rs.on('data', data => {
ws.write(data);
});
rs.on('end', () => {
ws.end();
});
}上面代码的细节将在后文揭晓。 Stream 的分类Stream 可以分为四类
可读流与可写流是基础,常见的可读流与可写流如下
Stream 是 EventEmitter 的实例,有自定义的事件。 Readable Stream可读流有两个模式,暂停模式与流动模式。当我们创建一个流时,如果我们监听了
const fs = require('fs');
const rs = fs.createReadStream('a.txt', {
highWaterMark: 1 // 缓存区最多存储 1 字节
});
rs.on('readable', () => {
let data;
while(data=rs.read()) {
console.log(data.toString());
}
})上面的程序设置 当我们为可读流绑定 const fs = require('fs');
const rs = fs.createReadStream('a.txt', {
highWaterMark: 2
});
rs.on('data', data => {
console.log(data.toString());
});
rs.on('end', () => {
console.log("文件读取完毕!");
});暂停模式像是手动步枪,而流动模式则像是自动步枪。暂停模式与流动模式也可以相互切换,通过 可读流的一个经典实例就是 const http = require('http');
const app = http.createServer();
app.on('request', (req, res) => {
let datas = [];
req.on('data', data => {
datas.push(data);
});
req.on('end', () => {
req.body = Buffer.concat(datas);
// 当读取完 body 中的内容之后,将内容返回给客户端
res.end(req.body);
});
})
app.listen(3000, () => {
console.log("服务启动在 3000 端口... ...");
})
Writable Stream可写流与可读流相似,当我们向可写流写入数据时(通过可写流的 当缓冲区中的数据全部被消费完了(写入了文件中或者被别的流消费了),那么就会触发 const fs = require('fs');
const ws = fs.createWriteStream('b.txt', {
highWaterMark: 16 * 1024
});
function writeMillionTimes(writer, data, encoding, callback) {
let i = 10000;
write();
function write() {
// 表示是否可以向可写流中写入数据
let ok = true;
while(i-- > 0 && ok) {
// 当 writer.write() 方法返回 false 表示不可写入数据
ok = writer.write(data, encoding, i === 0 ? callback : null);
}
if(i > 0) {
// 说明 ok 为 false,即不能向缓冲区中写入内容了
console.log("drain", i);
// 监听 drain 事件,当队列消费完毕时继续调用 write() 方法写入
writer.once('drain', write);
}
}
}
writeMillionTimes(ws, 'simple', 'utf-8', () => {
console.log("end");
})输出为 drain 7268 drain 4536 drain 1804 end 说明有三次缓冲区中的内容达到了 16KB,可以验算上面的数字之间的差值,在乘以 我们还可以调用可写流的 const fs = require('fs');
const ws = fs.createWriteStream('b.txt');
ws.write('Hello');
ws.write('World');
ws.end('!');
ws.on('close', () => {
console.log("close"); // close
})当调用 const fs = require('fs');
const ws = fs.createWriteStream('b.txt');
ws.write('Hello');
ws.write('World');
ws.end('!');
ws.write('write again'); // Error [ERR_STREAM_WRITE_AFTER_END]: write after end当调用 const fs = require('fs');
const ws = fs.createWriteStream('b.txt');
ws.write('Hello');
ws.write('World');
ws.end('!');
ws.on('close', () => {
console.log("close");
});
ws.on('finish', () => {
console.log("finish");
});打印结果是 finish close 说明 可写流的经典例子就是 const http = require('http');
const fs = require('fs');
const app = http.createServer();
app.on('request', (req, res) => {
const rs = fs.createReadStream('index.html');
rs.on('data', data => {
res.write(data);
})
rs.on('end', () => {
res.end()
});
});
app.listen(3000, () => {
console.log("服务启动在 3000 端口 ... ...");
})Duplex Stream 与 Transform StreamDuplex,即双工的意思,它既可以接收数据,也可以输出数据,它的输入和输出之间可以没有任何的关系,就像是一个部件内部有两个独立的系统。Duplex 继承了可读流(Readable),并且拥有可写流(Writable)的所有方法。 Transform Stream 继承了 Duplex Stream,它同样具有可读流与可写流的能力,并且它的输出与输入之间是有关系的,中间做了一次转换。常见的转换流有 出于文章结构的考虑,在这里不详细讲解这两个流,在后文中会实现这两个流,以加深对这两个流的理解。 pipe我们可以混合使用可读流与可写流来进行文件的复制 const fs = require('fs');
function copy(source, target) {
const rs = fs.createReadStream(source);
const ws = fs.createWriteStream(target);
rs.on('data', data => {
ws.write(data);
});
rs.on('end', () => {
ws.end();
});
}
copy('a.txt', 'b.txt');但是上面的写法却不被建议使用,因为没有考虑到可读流与可写流速度之间的差异,如果可读流输出数据的速度大于可写流写入数据的速度,这个时候就会有数据一直堆压在缓存区,导致占用过高的内存,专业术语叫做积压。 我们需要改善上面的程序,具体做法就是当 const fs = require('fs');
function copy(source, target) {
const rs = fs.createReadStream(source);
const ws = fs.createWriteStream(target);
rs.on('data', data => {
if (!ws.write(data)) {
rs.pause();
}
});
rs.on('end', () => {
ws.end();
});
ws.on('drain', () => {
rs.resume();
})
}那是不是每次我们使用流都需要写这么多的代码,当然不是。官方为可读流提供了一个 const fs = require('fs');
function copy(source, target) {
const rs = fs.createReadStream(source);
const ws = fs.createWriteStream(target);
rs.pipe(ws);
}当我们调用 Readable.prototype.pipe = function(ws) {
this.on('data', data => {
if (!ws.write(data)) {
this.pause();
}
});
ws.on('drain', () => {
this.resume();
});
// 触发 pipe 事件
ws.emit('pipe', this);
// 返回可写流,以支持链式调用
return ws;
}这里给出官网画的一个有关 +===================+
x--> Piping functions +--> src.pipe(dest) |
x are set up during |===================|
x the .pipe method. | Event callbacks |
+===============+ x |-------------------|
| Your Data | x They exist outside | .on('close', cb) |
+=======+=======+ x the data flow, but | .on('data', cb) |
| x importantly attach | .on('drain', cb) |
| x events, and their | .on('unpipe', cb) |
+---------v---------+ x respective callbacks. | .on('error', cb) |
| Readable Stream +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable Stream +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | Is this chunk too big? |
^ | | emit .end(); | Is the queue busy? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< No | | Yes |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ return false; <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^------------^-----------------------< Buffering | |
| |============| |
+> emit .drain(); | ^Buffer^ | |
+> emit .resume(); +------------+ |
| ^Buffer^ | |
+------------+ add chunk to queue |
| <---^---------------------<
+============+实现流在本节中我们来实现具体的流,通过实现流可以进一步加深对 Stream 内部工作细节的理解。 实现可读流上面我们都是通过 const { Readable } = require('stream');
class IeteratorReadableStream extends Readable {
constructor(iterator) {
super();
this.iterator = iterator;
}
_read() {
let data = this.iterator.next();
// console.log(data);
if(data.done) {
this.push(null);
} else {
// 必须 push 字符串或者 Buffer
this.push(data.value+'');
}
}
}
module.exports = IeteratorReadableStream;上述我们实现了一个可读流,可读流接收一个迭代器作为参数,这个迭代器作为这个可读流的数据源。可读流会自动的调用 所以可读流的执行逻辑为,每次调用 我们创建一个迭代器作为数据源传入 const IeteratorReadableStream = require('./IteratorReadableStream');
function *getData() {
for(let i = 0; i < 5; i++) {
yield i;
}
}
let rs = new IeteratorReadableStream(getData());
rs.on('data', data => {
console.log(data.toString());
});
rs.on('end', () => {
console.log("迭代结束");
});输出为 0 1 2 3 4 迭代结束 实现可写流实现可写流的过程同实现可读流的过程类似,首先需要继承 const fs = require('fs');
const { Writable } = require('stream');
class FileWritableStream extends Writable {
constructor(filepath) {
super();
this.filepath = filepath;
}
_write(chunk, encoding, callback) {
fs.appendFile(this.filepath, chunk, {
encoding
}, callback)
}
}上面我们实现了一个可写流,这个可写流接收一个文件路径作为参数,它的作用就是向这个文件中追加数据,每次当我们调用可写流的 process.stdin.pipe(new FileWritableStream('c.txt'));上面这行代码的作用就是将从标准输入的字符输出到 实现双工流Duplex Stream 既可以作为可读流,也可以作为可写流,并且它的输入与输出之间可以没有关系。Duplex Stream 继承了 Readable,并且拥有 Writable 的所有,我们只要分别实现 const { Duplex } = require('stream');
class CustomDuplexStream extends Duplex {
constructor() {
super();
this.currentCharCode = 65;
}
_read() {
if(this.currentCharCode <= 90) {
this.push(String.fromCharCode(this.currentCharCode++))
} else {
this.push(null);
}
}
_write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
}上面双工流的可读流部分就是将大写的 26 个字母添加进了缓存区,而可写流部分就是直接将数据输出到控制台。可见双工流可读流与可写流之间并没有任何的关系 const dp = new CustomDuplexStream();
dp.write("1");
dp.write("2");
dp.end();
dp.pipe(process.stdout);输出为 1 2 ABCDEFGHIJKLMNOPQRSTUVWXYZ 实现转换流Tranform Stream 是 Duplex 的特例,它也是一个双工流,不过它的输入和输出之间有关联,它的内部通过 const { Transform } = require('stream');
class UpperTransformStream extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}上面我们实现了一个转换流,它可以将输入的小写字符转化为大写字符然后输出 const ts = new UpperTransformStream();
const rs = fs.createReadStream('a.txt');
rs.pipe(ts).pipe(process.stdout);上面程序会读取 转换流在实际应用中还是比较多的,这里介绍一个 Node.js 内置的转换流 const zlib = require('zlib');
const fs = require('fs');
const args = process.argv.slice(2);
const source = fs.createReadStream(args[0]);
const target = fs.createWriteStream(args[1]);
const gzip = zlib.createGzip();
source.pipe(gzip).pipe(target);我们可以通过 node gzip.js Graph.md Graph.md.gz 来运行上面的程序,它可以将 Graph.md 使用 gzip 压缩为 Graph.md.gz。
文件大小从 同样的我们也可以通过 本篇文章到此结束,想必到这里已经对 Stream 的使用已经有所了解了,但是本篇文章并没有列举所有有关 Stream 的 API,如果想更加详细的了解 Stream 的 API,那么接下去就可以阅读官方文档(http://nodejs.cn/api/stream.html)了。 更多node相关知识,请访问:nodejs 教程!! 以上就是聊聊Node.js中如何实现Stream流(可读、可写、双工和转换流)的详细内容,更多请关注模板之家(www.mb5.com.cn)其它相关文章! |
