- buffer 缓冲
- child_process - 子进程
- cluster (集群)
- console (控制台)
- net (网络)
- dgram (数据报)
- dns (域名服务器)
- Error (错误)
- events (事件)
- stream - 流
学习文档来源 nodejs中文网
注意attention: 转载学习来源–Nodejs学习笔记以及经验总结,公众号”程序猿小卡”
buffer 缓冲
Buffer 类被引入作为 Node.js API 的一部分,使其可以在 TCP 流或文件系统操作等场景中处理二进制数据流。
Buffer 类的实例类似于整数数组,但 Buffer 的大小是固定的、且在 V8 堆外分配物理内存。 Buffer 的大小在被创建时确定,且无法调整。
Buffer 类在 Node.js 中是一个全局变量,因此无需使用 require(‘buffer’).Buffer。
由于 new Buffer() 的行为因第一个参数的类型而异,因此当未执行参数验证或 Buffer 初始化时,可能会无意中将安全性和可靠性问题引入应用程序。
为了使 Buffer 实例的创建更可靠且更不容易出错,各种形式的 new Buffer() 构造函数都已被弃用,且改为单独的 Buffer.from(),Buffer.alloc() 和 Buffer.allocUnsafe() 方法。
类方法
Buffer.alloc(size[, fill[, encoding]]) //分配一个大小为 size 字节的新建的 Buffer
Buffer.allocUnsafe(size) //以这种方式创建的 Buffer 实例的底层内存是未初始化的。 新创建的 Buffer 的内容是未知的,且可能包含敏感数据。 可以使用 buf.fill(0) 初始化 Buffer 实例为0。
Buffer.from(buffer) // 返回一个新的 Buffer,其中包含给定 Buffer 的内容的副本
Buffer.from(array) // 返回一个新的 Buffer,其中包含提供的八位字节数组的副本
Buffer.from(string[, encoding]) // 返回一个新的 Buffer,其中包含提供的字符串的副本。
Buffer.compare(buf1, buf2) //比较 buf1 与 buf2,主要用于 Buffer 数组的排序,相当于调buf1.compare(buf2)
Buffer.concat(list[, totalLength])
类属性
buf.fill(value[, offset[, end]][, encoding]) // 用指定的 value 填充 buf。 如果没有指定 offset 与 end,则填充整个 buf
buf.includes(value[, byteOffset][, encoding])
buf.length
// 返回 buf 的 JSON 格式,类似于 { type: 'Buffer', data: [ 104, 101, 108, 108, 111 ] }
buf.toJSON()
// 根据 encoding 指定的字符编码将 buf 解码成字符串
buf.toString([encoding[, start[, end]]])
// 根据 encoding 指定的字符编码将 string 写入到 buf 中的 offset 位置。 如果 buf 没有足够的空间保存整个字符串,则只会写入 string 的一部分。
buf.write(string[, offset[, length]][, encoding])
// 对比 buf 与 target,并返回一个数值,表明 buf 在排序上是否排在 target 前面、或后面、或相同。 对比是基于各自 Buffer 实际的字节序列
buf.compare(target[, targetStart[, targetEnd[, sourceStart[, sourceEnd]]]])
// 拷贝 buf 的一个区域的数据到 target 的一个区域,即便 target 的内存区域与 buf 的重叠,注意变化的是 target
buf.copy(target[, targetStart[, sourceStart[, sourceEnd]]])
// 创建一个指向与原始 Buffer 同一内存的新 Buffer,但使用 start 和 end 进行了裁剪
buf.slice([start[, end]])
UTF-8
UTF-8 的编码规则很简单:如果只有一个字节,那么最高的比特位为 0;如果有多个字节,那么第一个字节从最高位开始,连续有几个比特位的值为 1,就使用几个字节编码,剩下的字节均以 10 开头。 具体的表现形式为: 0xxxxxxx:单字节编码形式,这和 ASCII 编码完全一样,因此 UTF-8 是兼容 ASCII 的; 110xxxxx 10xxxxxx:双字节编码形式; 1110xxxx 10xxxxxx 10xxxxxx:三字节编码形式; 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx:四字节编码形式。
buffer的中文乱码问题
// testdata.md 为中文
var fs = require('fs');
var rs = fs.createReadStream('testdata.md', {highWaterMark: 11}); // 乱码
//var rs = fs.createReadStream('testdata.md', {encoding:'utf-8',highWaterMark: 11}); // 无乱码
var data = '';
rs.on("data", function (trunk){
data += trunk;
});
rs.on("end", function () {
console.log(data);
});
通过string_decoder对象输出第一个截取Buffer(事件循xx)时,只返回事 件循这个字符串,保留xx。第二次通过string_decoder对象输出时检测到上次保 留的xx,将上次剩余内容和本次的Buffer进行重新拼接输出。于是达到正常输出 的目的。
string_decoder,目前在文件流读取和网络流读取中都有应用到,一定程度上避 免了粗鲁拼接trunk导致的乱码错误。但是,遗憾在于string_decoder目前只支 持utf8编码。它的思路其实还可以扩展到其他编码上,只是最终是否会支持目前 尚不可得知.
连接Buffer对象的正确方法:
var fs = require('fs');
var iconv = require('iconv-lite');
var rs = fs.createReadStream('testdata.md');
var chunks = [];
var size = 0;
rs.on('data', function(chunk){
chunks.push(chunk);
size += chunk.length;
})
rs.on('end', function(){
var buf = Buffer.concat(chunks, size);
var str = iconv.decode(buf, 'utf-8');
console.log(str);
});
正确的拼接方法是用一个数组来存储接收到的所有的Buffer片段并记录下所有片段的总长度,然后用Buffer.concat()方法生成一个合并的Buffer对象。Buffer.concat()方法封装了从小Buffer对象到大Buffer对象的复制过程。
Buffer.concat: 代码注释来源 Nodejs源码解析之Buffer
// 相当于把Buffer数组中的内容,拼接起来。
// 特别注意的是,如果只有一个对象做拼接的时候,是不新创建对象的。
// 函数没有检查数组对象中的类型。
Buffer.concat = function(list, length) {
// 首先检查第一个参数list是否为数组,如果不是,直接报错。
if (!util.isArray(list))
throw new TypeError('list argument must be an Array of Buffers.');
// 检查第二个参数length是否传入,
if (util.isUndefined(length)) {
//处理没有传入的情况
//没有传入的情况下,处理成当前数组所有Buffer对象长度的总和
length = 0;
for (var i = 0; i < list.length; i++)
length += list[i].length;
} else {
// 已经传入,直接确保是uint32的值
// x >>> 0本质上就是保证x有意义(为数字类型),且为正整数,在有效的数组范围内(0 ~ 0xFFFFFFFF),且在无意义的情况下缺省值为0。 https://segmentfault.com/a/1190000014613703
length = length >>> 0;
}
// 如果数组的长度为0,直接返回长度为0的buffer对象
if (list.length === 0)
return new Buffer(0);
else if (list.length === 1)
//如果长度为1,直接返回的是第一个对象
return list[0];
// 否则需要新创建对象,将buffer对象拼接起来。
var buffer = new Buffer(length);
var pos = 0;
for (var i = 0; i < list.length; i++) {
var buf = list[i];
// 拷贝对象内容到新的buffer对象中。
// copy的定义在node_buffer.cc中,使用memmove实现
buf.copy(buffer, pos);
pos += buf.length;
}
return buffer;
};
child_process - 子进程
child_process 模块提供了衍生子进程的功能,它与 popen(3) 类似,但不完全相同。 这个功能主要由 child_process.spawn() 函数提供:
const { spawn } = require('child_process');
const ls = spawn('ls', ['-lh', '/usr']);
ls.stdout.on('data', (data) => {
console.log(`输出:${data}`);
});
ls.stderr.on('data', (data) => {
console.log(`错误:${data}`);
});
ls.on('close', (code) => {
console.log(`子进程退出码:${code}`);
});
默认情况下,Node.js 的父进程与衍生的子进程之间会建立 stdin、stdout 和 stderr 的管道。 数据能以非阻塞的方式在管道中流通。 有些程序会在内部使用行缓冲 I/O,虽然这并不影响 Node.js,但发送到子进程的数据可能无法被立即使用。
导出方法
// 异步地衍生子进程,且不阻塞 Node.js 事件循环, 使用给定的 command 衍生一个新进程,并带上 args 中的命令行参数。 如果省略 args,则其默认为空数组。
child_process.spawn(command[, args][, options])
// 以同步的方式提供等效功能,但会阻止事件循环直到衍生的进程退出或终止。
child_process.spawnSync(command[, args][, options])
// 衍生一个 shell 并在该 shell 中运行命令,当完成时则将 stdout 和 stderr 传给回调函数。
child_process.exec(command[, options][, callback])
// 类似于 child_process.exec(),除了它默认会直接衍生命令且不首先衍生 shell。
child_process.execFile(file[, args][, options][, callback])
// 衍生一个新的 Node.js 进程,并通过建立 IPC 通信通道来调用指定的模块,该通道允许在父进程与子进程之间发送消息。
child_process.fork(modulePath[, args][, options])
// child_process.execFile() 的同步版本,会阻塞 Node.js 事件循环。
child_process.execFileSync(file[, args][, options])
// child_process.exec() 的同步版本,会阻塞 Node.js 事件循环。
child_process.execSync(command[, options])
const { execFile } = require('child_process');
execFile('node', ['--version'], (error, stdoutStr, stderrStr) => {
if (error) {
throw error;
}
console.log(stdoutStr);
})
ChildProcess 类
ChildProcess 类的实例是 EventEmitter,代表衍生的子进程。
ChildProcess 的实例不被直接创建。 而是,使用 child_process.spawn()、child_process.exec()、child_process.execFile() 或 child_process.fork() 方法创建 ChildProcess 实例。
child_process模块提供了一个ChildProcess的新类,它可以作为从父进程访问子进程的表示形式。Process模块也是ChildProcess对象。当你从父模块访问process时,它是父ChildProcess对象,当你从子进程访问Process时,它是子ChildProcess对象
// 运行 ps ax | grep ssh 的方式
// ps、grep 为ChildProcess类,stdout 为ChildProcess的可读流, stdin 为ChildProcess的可写流,
const { spawn } = require('child_process');
const ps = spawn('ps', ['ax']);
const grep = spawn('grep', ['ssh']);
ps.stdout.on('data', data => {
grep.stdin.write(data);
})
ps.stderr.on('data', data => {
console.log(`ps stderr: ${data}`);
})
ps.on('close', code => {
if (code !== 0) {
console.log(`ps 进程的退出码:${code}`);
}
grep.stdin.end();
})
grep.stdout.on('data', data => {
console.log(data.toString());
});
grep.stderr.on('data', (data) => {
console.log(`grep stderr: ${data}`);
});
grep.on('close', (code) => {
if (code !== 0) {
console.log(`grep 进程的退出码:${code}`);
}
});
- 1.事件:
'close' 事件
'disconnect' 事件
'error' 事件
'exit' 事件
'message' 事件
const { spawn } = require('child_process');
const grep = spawn('grep', ['ssh']);
grep.on('close', (code, signal) => {
console.log(`子进程收到信号 ${signal} 而终止`);
});
// 发送 SIGHUP 到进程
grep.kill('SIGHUP');
- 2.属性和方法:
subprocess.connected // #表明是否仍可以从一个子进程发送和接收消息
subprocess.stderr // <stream.Readable>一个代表子进程的 stderr 的可读流。
subprocess.stdin // <stream.Writable> 一个代表子进程的 stdin 的可写流
subprocess.stdout // <stream.Readable> 一个代表子进程的 stdout 的可读流。
subprocess.disconnect() // 关闭父进程与子进程之间的 IPC 通道,一旦没有其他的连接使其保持活跃,则允许子进程正常退出。
subprocess.kill([signal])
subprocess.send(message[, sendHandle[, options]][, callback])
当父进程和子进程之间建立了一个 IPC 通道时(例如,使用 child_process.fork()),subprocess.send() 方法可用于发送消息到子进程。 当子进程是一个 Node.js 实例时,消息可以通过 [process.on(‘message’)] 事件接收。
注意: 消息通过序列化和解析进行传递,结果就是消息可能跟开始发送的不完全一样。
// 例子,父进程脚本如下:
const cp = require('child_process');
const n = cp.fork(`${__dirname}/sub.js`);
n.on('message', (m) => {
console.log('PARENT got message: ', m);
});
// Causes the child to print: CHILD got message: { hello: 'world' }
n.send({ hello: 'world' });
//然后是子进程脚本,'sub.js' 可能看上去像这样:
process.on('message', (m) => {
console.log('CHILD got message:', m);
});
// Causes the parent to print: PARENT got message: { foo: 'bar', baz: null }
process.send({ foo: 'bar', baz: NaN });
发送句柄:
sendHandle 参数可用于将一个 TCP server 对象句柄传给子进程;
当服务器在父进程和子进程之间是共享的,则一些连接可被父进程处理,另一些可被子进程处理.
const subprocess = require('child_process').fork('subprocess.js');
// 开启 server 对象,并发送该句柄。
const server = require('net').createServer();
server.on('connection', (socket) => {
socket.end('被父进程处理');
});
server.listen(1337, () => {
subprocess.send('server', server);
});
// 子进程接收 server 对象如下:
process.on('message', (m, server) => {
if (m === 'server') {
server.on('connection', (socket) => {
socket.end('被子进程处理');
});
}
});
cluster (集群)
cluster 模块允许简单容易的创建共享服务器端口的子进程。
// 官网实现方式
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
console.log('执行');
if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);
// 衍生工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
});
} else {
// 工作进程可以共享任何 TCP 连接。
// 在本例子中,共享的是一个 HTTP 服务器。
http.createServer((req, res) => {
res.writeHead(200, {'Content-Type' : 'text/html;charset:utf-8'});
res.write('<head><meta charset="utf-8"/></head>');
res.end('你好世界\n' + process.pid);
}).listen(3000);
console.log(`工作进程 ${process.pid} 已启动`);
}
另外一种更优雅的实现方式:
const cluster = require('cluster');
cluster.setupMaster({
exec: "cluster_worker.js"
});
var cpus = require('os').cpus();
for(var i=0; i<cpus.length; i++){
cluster.fork();
}
// cluster_worker.js
var http = require('http');
http.createServer((req, res) => {
res.writeHead(200);
res.end('你好世界\n');
}).listen(8000);
console.log(`工作进程 ${process.pid} 已启动`);
Class: Worker
Worker对象包含了关于工作进程的所有public信息和方法。
在一个主进程里,可以使用cluster.workers来获取Worker对象。
在一个工作进程里,可以使用cluster.worker来获取Worker对象。
Event: 'disconnect'
Event: 'error'
Event: 'exit'
Event: 'listening'
Event: 'message'
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
// 跟踪 http 请求
let numReqs = 0;
setInterval(() => {
console.log(`numReqs = ${numReqs}`);
}, 1000);
// 计算请求数目
function messageHandler(msg) {
if (msg.cmd && msg.cmd === 'notifyRequest') {
numReqs += 1;
}
}
// 启动 worker 并监听包含 notifyRequest 的消息
const numCPUs = require('os').cpus().length;
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
for (const id in cluster.workers) {
cluster.workers[id].on('message', messageHandler);
}
} else {
// Worker 进程有一个http服务器
http.Server((req, res) => {
res.writeHead(200);
res.end('hello world\n');
// 通知 master 进程接收到了请求
process.send({ cmd: 'notifyRequest' });
}).listen(8000);
}
console (控制台)
console 模块提供了一个简单的调试控制台,类似于 Web 浏览器提供的 JavaScript 控制台。
该模块导出了两个特定的组件:
一个 Console 类,包含 console.log() 、 console.error() 和 console.warn() 等方法,可以被用于写入到任何 Node.js 流。
一个全局的 console 实例,可被用于写入到 process.stdout 和 process.stderr。 全局的 console 使用时无需调用 require(‘console’)。
注意:全局的 console 对象的方法既不总是同步的(如浏览器中类似的 API),也不总是异步的(如其他 Node.js 流)。 详见 进程 I/O。
Console 类
Console 类可用于创建一个具有可配置的输出流的简单记录器,可以通过 require(‘console’).Console 或 console.Console 使用:
const { Console } = require('console');
const { Console } = console;
// new Console(stdout[, stderr][, ignoreErrors])#
// new Console(options)
const output = fs.createWriteStream('./stdout.log');
const errorOutput = fs.createWriteStream('./stderr.log');
// custom simple logger
const logger = new Console({ stdout: output, stderr: errorOutput });
// use it like console
const count = 5;
logger.log('count: %d', count);
// in stdout.log: count 5
var fs = require('fs');
var file = fs.createWriteStream('./stdout.txt', {flags: 'a+'}); // 追加模式
var logger = new console.Console(file, file);
logger.log('hello2');
logger.log('word2');
用一个或两个输出流实例创建一个新的 Console。 输出流 stdout 用来记录日志和信息;stderr 用来记录警告和错误。如果不提供 stderr,则 stdout 会被用作 stderr。全局符号 console 是一个特殊的 Console 实例,其输出会被送往 process.stdout 和 process.stderr。它等价于调用:
new Console({ stdout: process.stdout, stderr: process.stderr });
console.dir(obj[, options])
: 可以通过options选项depth自定义打印的层级数,默认是2,这对于调试很有帮助
console.log('%c this %c is a %c message','color:#f00;','font-size:20px;','color:blue;background:yellow;')
–在chrome允许你控制日志消息的样式。
console.log("\033[32m 绿色字 执行 \033[0m")
–在nodejs中设置颜色,参考https://www.cnblogs.com/sivkun/p/7890547.html
net (网络)
net 模块提供了创建基于流的 TCP 或 IPC 服务器(net.createServer())和客户端(net.createConnection()) 的异步网络 API。
net.createServer([options][, connectionListener])
Creates a new TCP or IPC server. 创建一个新的TCP或IPC服务。
options
const net = require('net');
//net.Server 类
const server = net.createServer((socket) => {
// 'connection' listener
console.log('client connected');
socket.on('end', () => {
console.log('client disconnected');
});
socket.write('hello\r\n');
socket.pipe(socket);
});
server.on('error', (err) => {
throw err;
});
server.listen(8124, () => {
console.log('server bound');
});
net.createConnection(options[, connectListener])
一个用于创建 net.Socket 的工厂函数,立即使用 socket.connect() 初始化链接,然后返回启动连接的 net.Socket。
当连接建立之后,在返回的 socket 上将触发一个 ‘connect’ 事件。若制定了最后一个参数 connectListener,则它将会被添加到 ‘connect’ 事件作为一个监听器。
const net = require('net');
const client = net.createConnection({ port: 8124 }, () => {
//'connect' listener
console.log('connected to server!');
client.write('world!\r\n');
});
client.on('data', (data) => {
console.log(data.toString());
client.end();
});
client.on('end', () => {
console.log('disconnected from server');
});
net.Server 类
'close' 事件
'connection' 事件
'error' 事件
'listening' 事件
server.address()
server.getConnections(callback)
server.listen(handle[, backlog][, callback])
server.listen(options[, callback])
server.listen(path[, backlog][, callback]) for IPC servers
server.listen([port][, host][, backlog][, callback]) for TCP servers
net.Socket 类
这个类是 TCP 或 UNIX Socket 的抽象(在Windows上使用命名管道,而UNIX使用域套接字)。一个net.Socket也是一个 duplex stream,所以它能被读或写,并且它也是一个 EventEmitter。
net.Socket可以 被用户创建并直接与server通信。举个例子,它是通过net.createConnection()返回的,所以用户可以使用它来与server通信。
当一个连接被接收时,它也能被Node.js创建并传递给用户。比如,它是通过监听在一个net.Server上的’connection’事件触发而获得的(createServer的回调函数参数),那么用户可以 使用它来与客户端通信。
'close' 事件
'connect' 事件
'data' 事件 // 当接收到数据的时触发该事件。data 参数是一个 Buffer 或 String。数据编码由 socket.setEncoding() 设置。(在 Readable Stream 章节查看更多信息。)注意当 Socket 发送 data 事件的时候,如果没有监听者数据将会丢失。
'drain' 事件
'end' 事件
'error' 事件
'listening' 事件
socket.address()
socket.bufferSize
socket.connect(options[, connectListener])
socket.connect(path[, connectListener]) 用于 IPC 连接。
socket.connect(port[, host][, connectListener]) 用于 TCP 。
dgram (数据报)
dgram模块提供了 UDP 数据包 socket 的实现。
const dgram = require('dgram');
const server = dgram.createSocket('udp4');
server.on('error', (err) => {
console.log(`服务器异常:\n${err.stack}`);
server.close();
});
server.on('message', (msg, rinfo) => {
console.log(`服务器收到:${msg} 来自 ${rinfo.address}:${rinfo.port}`);
});
server.on('listening', () => {
const address = server.address();
console.log(`服务器监听 ${address.address}:${address.port}`);
});
server.bind(41234);
// 服务器监听 0.0.0.0:41234
dgram.Socket 类
dgram.Socket对象是一个封装了数据包函数功能的EventEmitter。
dgram.Socket实例是由dgram.createSocket()创建的。创建dgram.Socket实例不需要使用new关键字。
'close' 事件
'error' 事件
'listening' 事件 // 当一个 socket 开始监听数据包信息时,'listening'事件将被触发。该事件会在创建 UDP socket 之后被立即触发。
'message' 事件 // 当有新的数据包被 socket 接收时,'message'事件会被触发。msg和rinfo会作为参数传递到该事件的处理函数中。msg <Buffer> - 消息 rinfo <Object> - 远程地址信息
socket.address()
socket.bufferSize
socket.bind(options[, callback])
socket.bind([port][, address][, callback]) //对于 UDP socket,bind方法会令dgram.Socket在指定的port和可选的address上监听数据包信息。若port未指定或为 0,操作系统会尝试绑定一个随机的端口。若address未指定,操作系统会尝试在所有地址上监听。绑定完成时会触发一个'listening'事件,并会调用callback方法。
socket.getRecvBufferSize()
socket.getSendBufferSize()
socket.send(msg, [offset, length,] port [, address] [, callback])
dgram.createSocket(type[, callback])
dgram.createSocket 参数:
type - ‘udp4’ 或 ‘udp6’.
callback - 为 ‘message’ 事件添加一个监听器。
返回: dgram.Socket
创建一个特定 type 的dgram.Socket 对象。type参数是udp4 或 udp6。 可选传一个回调函数,作为 ‘message’ 事件的监听器。
一旦套接字被创建。调用socket.bind() 会指示套接字开始监听数据报消息。如果 address 和 port 没传给 socket.bind(), 那么这个方法会把这个套接字绑定到 “全部接口” 地址的一个随机端口(这适用于 udp4 和 udp6 套接字)。 绑定的地址和端口可以通过 socket.address().address 和socket.address().port 来获取。
socket.send(msg, [offset, length,] port [, address] [, callback])
socket.send参数:
msg: Buffer | Uint8Array | string | Array 要发送的消息。
offset: 整数。指定消息的开头在 buffer 中的偏移量。
length 整数。消息的字节数。
port 整数。目标端口。
address 目标主机名或 IP 地址。
callback 当消息被发送时会被调用。
msg参数包含了要发送的消息。根据消息的类型可以有不同的做法。如果msg是一个Buffer 或 Uint8Array,则offset和length指定了消息在Buffer中对应的偏移量和字节数。如果msg是一个String,那么它会被自动地按照utf8编码转换为Buffer。对于包含了多字节字符的消息,offset和length会根据对应的byte length进行计算,而不是根据字符的位置。如果msg是一个数组,那么offset和length必须都不能被指定。
address参数是一个字符串。若address的值是一个主机名,则 DNS 会被用来解析主机的地址。若address未提供或是非真值,则’127.0.0.1’(用于 udp4 socket)或’::1’(用于 udp6 socket)会被使用。
若在之前 socket 未通过调用bind方法进行绑定,socket 将会被一个随机的端口号赋值并绑定到“所有接口”的地址上(对于udp4 socket 是’0.0.0.0’,对于udp6 socket 是’::0’)。
可以指定一个可选的callback方法来汇报 DNS 错误或判断可以安全地重用buf对象的时机。注意,在 Node.js 事件循环中,DNS 查询会对发送造成至少 1 tick 的延迟。
确定数据包被发送的唯一方式就是指定callback。若在callback被指定的情况下有错误发生,该错误会作为callback的第一个参数。若callback未被指定,该错误会以’error’事件的方式投射到socket对象上。
偏移量和长度是可选的,但如其中一个被指定则另一个也必须被指定。另外,他们只在第一个参数是Buffer 或 Uint8Array 的情况下才能被使用。
dns (域名服务器)
dns 模块包含两类函数:
1) 第一类函数,使用底层操作系统工具进行域名解析,且无需进行网络通信。 这类函数只有一个:dns.lookup()。
const dns = require('dns');
dns.lookup('iana.org', (err, address, family) => {
console.log('IP 地址: %j 地址族: IPv%s', address, family);
});
// IP 地址: "192.0.43.8" 地址族: IPv4
2) 第二类函数,连接到一个真实的 DNS 服务器进行域名解析,且始终使用网络进行 DNS 查询。 这类函数包含了 dns 模块中除 dns.lookup() 以外的所有函数。
const dns = require('dns');
dns.resolve4('archive.org', (err, addresses) => {
if (err) throw err;
console.log(`IP 地址: ${JSON.stringify(addresses)}`);
addresses.forEach((a) => {
dns.reverse(a, (err, hostnames) => {
if (err) {
throw err;
}
console.log(`IP 地址 ${a} 逆向解析到域名: ${JSON.stringify(hostnames)}`);
});
});
});
Error (错误)
Node.js 中运行的应用程序一般会遇到以下四类错误:
- 标准的 JavaScript 错误:
- EvalError : 当调用 eval() 失败时抛出。
- SyntaxError : 当 JavaScript 语法错误时抛出。
- RangeError : 当值不在预期范围内时抛出。
- ReferenceError : 当使用未定义的变量时抛出。
- TypeError : 当传入错误类型的参数时抛出。
- URIError : 当全局的 URI 处理函数被误用时抛出。
- 由底层操作系触发的系统错误,例如试图打开一个不存在的文件、试图通过一个已关闭的 socket 发送数据等。
- 由应用程序代码触发的用户自定义的错误。
- 断言错误是错误的一个特殊类别,每当 Node.js 检测到一个不应该发生的异常逻辑时触发。 这类错误通常由 assert 模块引起。
所有由 Node.js 引起的 JavaScript 错误与系统错误都继承自或实例化自标准的 JavaScript Error 类,且保证至少提供类中的属性。
除了少数例外,同步的 API(任何不接受 callback 函数的阻塞方法,例如 fs.readFileSync)会使用 throw 报告错误。
异步的 API 中发生的错误可能会以多种方式进行报告:
大多数的异步方法都接受一个 callback 函数,该函数会接受一个 Error 对象传入作为 第一个参数。 如果第一个参数不是 null 而是一个 Error 实例,则说明发生了错误,应该进行处理。
const fs = require('fs');
fs.readFile('一个不存在的文件', (err, data) => {
if (err) {
console.error('读取文件出错!', err);
return;
}
// 否则处理数据
});
当一个异步方法被一个 EventEmitter 对象调用时,错误会被分发到对象的 ‘error’ 事件上。
const net = require('net');
const connection = net.connect('localhost');
// 添加一个 'error' 事件句柄到一个流:
connection.on('error', (err) => {
// 如果连接被服务器重置,或无法连接,或发生任何错误,则错误会被发送到这里。
console.error(err);
});
connection.pipe(process.stdout);
events (事件)
所有能触发事件的对象都是 EventEmitter 类的实例。 这些对象开放了一个 eventEmitter.on() 函数,允许将一个或多个函数绑定到会被对象触发的命名事件上。 事件名称通常是驼峰式的字符串,但也可以使用任何有效的 JavaScript 属性名。
当 EventEmitter 对象触发一个事件时,所有绑定在该事件上的函数都被同步地调用。 监听器的返回值会被丢弃。
例子,一个绑定了一个监听器的 EventEmitter 实例。 eventEmitter.on() 方法用于注册监听器,eventEmitter.emit() 方法用于触发事件。
const EventEmitter = require('events');
class MyEmitter extends EventEmitter {}
const myEmitter = new MyEmitter();
myEmitter.on('event', () => {
console.log('触发了一个事件!');
});
myEmitter.emit('event');
给监听器传入参数与 this
eventEmitter.emit() 方法允许将任意参数传给监听器函数。 当一个普通的监听器函数被 EventEmitter 调用时,标准的 this 关键词会被设置指向监听器所附加的 EventEmitter。
也可以使用 ES6 的箭头函数作为监听器。但是这样 this 关键词就不再指向 EventEmitter 实例
异步与同步
EventEmitter 会按照监听器注册的顺序同步地调用所有监听器。 所以需要确保事件的正确排序且避免竞争条件或逻辑错误。 监听器函数可以使用 setImmediate() 或 process.nextTick() 方法切换到异步操作模式
只处理事件一次
使用 eventEmitter.once() 方法时可以注册一个对于特定事件最多被调用一次的监听器。 当事件被触发时,监听器会被注销,然后再调用。
错误事件
当 EventEmitter 实例中发生错误时,会触发一个 ‘error’ 事件。 这在 Node.js 中是特殊情况。
如果 EventEmitter 没有为 ‘error’ 事件注册至少一个监听器,则当 ‘error’ 事件触发时,会抛出错误、打印堆栈跟踪、且退出 Node.js 进程。
作为最佳实践,应该始终为 ‘error’ 事件注册监听器。
const EventEmitter = require('events');
class MyEmitter extends EventEmitter {}
const myEmitter = new MyEmitter();
myEmitter.on('error', (err) => {
console.error('有错误');
});
myEmitter.emit('error', new Error('whoops!'));
// 打印: 有错误
EventEmitter 类
EventEmitter 类由 events 模块定义和开放的:
const EventEmitter = require('events');
当新的监听器被添加时,所有的 EventEmitter 会触发 ‘newListener’ 事件;当移除已存在的监听器时,则触发 ‘removeListener’。
'newListener' 事件
EventEmitter 实例会在一个监听器被添加到其内部监听器数组之前触发自身的 ‘newListener’ 事件。
注册了 ‘newListener’ 事件的监听器会传入事件名与被添加的监听器的引用。
事实上,在添加监听器之前触发事件有一个微妙但重要的副作用: 在’newListener’ 回调函数中, 一个监听器的名字如果和已有监听器名称相同, 则在被插入到EventEmitter实例的内部监听器数组时, 该监听器会被添加到其它同名监听器的前面。
'removeListener' 事件
‘removeListener’ 事件在 listener 被移除后触发。
emitter.emit(eventName[, ...args])
emitter.eventNames()
emitter.on(eventName, listener)
emitter.once(eventName, listener)
emitter.prependListener(eventName, listener)
emitter.prependOnceListener(eventName, listener)
// 移除全部监听器或指定的 eventName 事件的监听器
emitter.removeAllListeners([eventName])
// 从名为 eventName 的事件的监听器数组中移除指定的 listener
emitter.removeListener(eventName, listener)
stream - 流
流(stream)是一种在 Node.js 中处理流式数据的抽象接口。 stream 模块提供了一些基础的 API,用于构建实现了流接口的对象。
Node.js 提供了多种流对象。 例如,发送到 HTTP 服务器的请求和 process.stdout 都是流的实例。
流可以是可读的、可写的、或是可读写的。 所有的流都是 EventEmitter 的实例。
stream 模块可以通过以下方式使用:
const stream = require('stream');
尽管理解流的工作方式很重要,但是 stream 模块本身主要用于开发者创建新类型的流实例。 对于以消费流对象为主的开发者,极少需要直接使用 stream 模块。
Node.js 中有四种基本的流类型:
- Writable - 可写入数据的流(例如 fs.createWriteStream())。
- Readable - 可读取数据的流(例如 fs.createReadStream())。
- Duplex - 可读又可写的流(例如 net.Socket)。
- Transform - 在读写过程中可以修改或转换数据的 Duplex 流(例如 zlib.createDeflate())
对象模式
所有 Node.js API 创建的流都是专门运作在字符串和 Buffer(或 Uint8Array)对象上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null,因为它在流中有特殊用途)。 这些流会以“对象模式”进行操作。
当创建流时,可以使用 objectMode 选项把流实例切换到对象模式。 试图将已经存在的流切换到对象模式是不安全的。
缓冲
可写流和可读流都会在一个内部的缓冲器中存储数据,可以分别使用的 writable.writableBuffer 或 readable.readableBuffer 来获取。
可缓冲的数据的数量取决于传入流构造函数的 highWaterMark 选项。 对于普通的流,highWaterMark 选项指定了字节的总数量。 对于以对象模式运作的流,highWaterMark 指定了对象的总数量。
stream.Writable 类
可写流是对数据要被写入的目的地的一种抽象。
可写流的例子包括:
- 客户端上的 HTTP 请求
- 服务器上的 HTTP 响应
- fs 写入的流
- zlib 流
- crypto 流
- TCP socket
- 子进程 stdin
- process.stdout、process.stderr
上面的一些例子事实上是实现了可写流接口的 Duplex 流。所有可写流都实现了 stream.Writable 类定义的接口。
'close' 事件
// 如果调用 stream.write(chunk) 返回 false,则当可以继续写入数据到流时会触发 'drain' 事件
'drain' 事件
'error' 事件
// 调用 stream.end() 且缓冲数据都已传给底层系统之后触发
'finish' 事件
// 当在可读流上调用 stream.pipe() 时触发
'pipe' 事件
// 当在可读流上调用 stream.unpipe() 时触发
'unpipe' 事件
// 调用 writable.end() 表明已没有数据要被写入可写流。 可选的 chunk 和 encoding 参数可以在关闭流之前再写入一块数据。 如果传入了 callback 函数,则会做为监听器添加到 'finish' 事件。
writable.end([chunk][, encoding][, callback])
// writable.write() 方法写入一些数据到流中,并在这些数据被完全处理之后调用提供的 callback 。 如果发生错误,则 callback 可能被调用并传入错误作为第一个参数。 为了可靠地检测到写入的错误,可以为 'error' 事件添加监听器。
writable.write(chunk[, encoding][, callback])
// 关于write,如果要被写入的数据可以根据需要生成或者取得,建议将逻辑封装为一个可读流并且使用 stream.pipe()。 如果要优先调用 write(),则可以使用 'drain' 事件来防止背压与避免内存问题
stream.Readable 类
可读流是对提供数据的来源的一种抽象。
可读流的例子包括:
- 客户端上的 HTTP 响应
- 服务器上的 HTTP 请求
- fs 读取的流
- zlib 流
- crypto 流
- TCP socket
- 子进程 stdout 与 stderr
- process.stdin 所有的可读流都实现了 stream.Readable 类上定义的接口。
两种模式
可读流实质上运作于流动中(flowing)或已暂停(paused)两种模式之一。
在 flowing 模式中,数据自动地从底层的系统被读取,并通过 EventEmitter 接口的事件尽可能快地被提供给应用程序。
在 paused 模式中,必须显式调用 stream.read() 方法来从流中读取数据片段。
所有可读流都开始于 paused 模式,可以通过以下方式切换到 flowing 模式:
- 新增一个 ‘data’ 事件处理函数。
- 调用 stream.resume() 方法。
- 调用 stream.pipe() 方法发送数据到可写流。
可读流可以通过以下方式切换回 paused 模式:
- 如果没有管道目标,调用 stream.pause() 方法。
- 如果有管道目标,移除所有管道目标。调用 stream.unpipe() 方法可以移除多个管道目标。
需要记住的重要概念是,只有提供了消费或忽略数据的机制后,可读流才会产生数据。 如果消费的机制被禁用或移除,则可读流会停止产生数据。
为了向后兼容,移除 ‘data’ 事件处理函数不会自动地暂停流。 如果存在管道目标,一旦目标变为 drain 状态并请求接收数据时,则调用 stream.pause() 也不能保证流会保持暂停状态。
如果可读流切换到 flowing 模式,且没有可用的消费函数处理数据,则这些数据将会丢失。 例如,当调用 readable.resume() 方法时,没有监听 ‘data’ 事件或 ‘data’ 事件的处理函数从流中被移除了。
三种状态
可读流运作的两种模式是对发生在可读流中更加复杂的内部状态管理的一种简化的抽象。在任意时刻,任一可读流会处于以下三种状态之一:
- readable.readableFlowing = null
- readable.readableFlowing = false
- readable.readableFlowing = true
当 readable.readableFlowing 为 null 时,没有提供消费流数据的机制,所以流不会产生数据。 在这个状态下,监听 ‘data’ 事件、调用 readable.pipe() 方法、或调用 readable.resume() 方法, 则readable.readableFlowing 会变成 true ,可读流开始主动地产生数据触发事件。
调用 readable.pause()、readable.unpipe()、或接收背压,则 readable.readableFlowing 会被设为 false,暂时停止事件流动但不会停止数据的生成。 在这个状态下,为 ‘data’ 事件设置监听器不会使 readable.readableFlowing 变成 true。
const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing 现在为 false。
pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // 不会触发 'data' 事件。
pass.resume(); // 必须调用它才会触发 'data' 事件。
'close' 事件
'data' 事件 // 会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe(), readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。
'end' 事件
'error' 事件
'readable' 事件
readable.pause()
readable.pipe(destination[, options]) // eadable.pipe() 绑定一个 Writable 到 readable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed);readable.pipe() 会返回目标流的引用,这样就可以对流进行链式地管道操作
readable.read([size])
readable.unpipe([destination])
用于实现流的 API
stream模块API的设计是为了让JavaScript的原型继承模式可以简单的实现流。
首先,一个流开发者可能声明了一个JavaScript类并且继承四个基本流类中的一个(stream.Writeable,stream.Readable,stream.Duplex,或者stream.Transform),确保他们调用合适的父类构造函数
-
Implementing a Writable Stream
自定义可写流必须调用new stream.Writable([options])构造函数并且实现writable._write()方法。writable._writev()方法也是可以实现的。
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { super(options); // ... } }
或者,使用ES6之前的语法来创建构造函数:
const { Writable } = require('stream'); const util = require('util'); function MyWritable(options) { if (!(this instanceof MyWritable)) return new MyWritable(options); Writable.call(this, options); } util.inherits(MyWritable, Writable);
或者,使用简化的构造函数方法:
const { Writable } = require('stream'); const myWritable = new Writable({ write(chunk, encoding, callback) { // ... }, writev(chunks, callback) { // ... } });
// 所有可写流的实现必须提供 writable._write() 方法将数据发送到底层资源,定义在类的内部,不应该被用户程序直接调用 writable._write(chunk, encoding, callback) writable._writev(chunks, callback) writable._destroy(err, callback) writable._final(callback)
const { Writable } = require('stream'); const { StringDecoder } = require('string_decoder'); class StringWritable extends Writable { constructor(options) { super(options); const state = this._writableState; this._decoder = new StringDecoder(state.defaultEncoding); this.data = ''; } _write(chunk, encoding, callback) { if (encoding === 'buffer') { chunk = this._decoder.write(chunk); } this.data += chunk; callback(); } _final(callback) { this.data += this._decoder.end(); callback(); } } const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from); const w = new StringWritable(); w.write('currency: '); w.write(euro[0]); w.end(euro[1]); console.log(w.data); // currency: €
-
Implementing a Readable Stream
用户实现的自定义可读流 必须 调用new stream.Readable([options]) 构造函数并且实现readable._read()方法。
const { Readable } = require('stream'); class MyReadable extends Readable { constructor(options) { // Calls the stream.Readable(options) constructor super(options); // ... } }
readable._read(size)
当 readable._read() 被调用,如果读取的数据是可用的,应该在最开始的实现的时候使用this.push(dataChunk)方法将该数据推入读取队列。_read() 应该一直读取资源直到推送数据方法readable.push()返回false的时候停止。想再次调用_read()方法,需要再次往可读流里面push数据。注意:一旦readable._read()方法被调用,只有在 readable.push()方法被调用之后,才能再次被调用。
readable.push(chunk[, encoding])
当可读流处在传输模式下,’data’事件触发时,可以通过 调用readable.read() 方法读出来数据,这数据是用readable.push()添加的。 注意: readable.push()方法是为了让可读流实现者调用的, 而且只来自readable._read()方法内部。
const { Readable } = require('stream'); class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000000; this._index = 1; } _read() { const i = this._index++; if (i > this._max) this.push(null); else { const str = '' + i; const buf = Buffer.from(str, 'ascii'); this.push(buf); } } }
- Implementing a Duplex Stream
const { Duplex } = require('stream'); const kSource = Symbol('source'); class MyDuplex extends Duplex { constructor(source, options) { super(options); this[kSource] = source; } _write(chunk, encoding, callback) { // The underlying source only deals with strings if (Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); } _read(size) { this[kSource].fetchSomeData(size, (data, encoding) => { this.push(Buffer.from(data, encoding)); }); } }
-
Implementing a Transform Stream
stream.Transform类最初继承自stream.Duplex,并且实现了它自己版本的writable._write()和readable._read()方法。 一般地,变换流必须实现transform._transform() 方法; 而transform._flush() 方法是非必须的。
注意: 用变换流时要注意,如果读端输出没有被消费,那么往写数据可能会引起写端暂停。transform._transform(chunk, encoding, callback)
注意:此函数不得直接由应用程序代码调用。它应该由子类实现,并由内部的Readable类方法调用。 所有的变换流的执行必须提供一个_transform()方法接收输入并提供输出。transform._transform()的实现会处理写入的字节,做某种计算并输出,然后使用readable.push()方法把这个输出传递到可读流。
从一个单个输入数据块可能会调用零次或多次transform.push()方法,调用次数取决于每次把多少数据做为一个输出结果。
有可能从任意给定输入的数据块中没有产生任何输出。
callback 会在当前数据被完全消费之后调用。在处理过程输入的过程中如果出错了,第一个参数是一个错误对象,没有出错Error参数则为null。如果传递第二个参数,它会被转发到readable.push()中。 - 推荐文章 [译]关于Node.js streams你需要知道的一切