Node.js 进程与线程
问题
Node.js 如何实现多进程和多线程?child_process、cluster 和 worker_threads 有什么区别?
答案
Node.js 提供三种方式利用多核 CPU:
- child_process:创建子进程,执行任意命令或脚本
- cluster:创建多个工作进程,共享同一端口
- worker_threads:创建工作线程,共享内存
child_process
spawn
import { spawn } from 'child_process';
// 创建子进程执行命令
const ls = spawn('ls', ['-la']);
ls.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
ls.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
ls.on('close', (code) => {
console.log(`子进程退出,退出码: ${code}`);
});
// 选项
const child = spawn('node', ['script.js'], {
cwd: '/path/to/dir', // 工作目录
env: { ...process.env, NODE_ENV: 'production' },
stdio: 'inherit', // 继承父进程的 stdio
detached: true, // 分离子进程
shell: true // 使用 shell 执行
});
exec
import { exec, execSync } from 'child_process';
import { promisify } from 'util';
const execAsync = promisify(exec);
// 异步执行
exec('ls -la', (error, stdout, stderr) => {
if (error) {
console.error('执行错误:', error);
return;
}
console.log('stdout:', stdout);
});
// Promise 版本
const { stdout, stderr } = await execAsync('ls -la');
// 同步执行
const result = execSync('ls -la', { encoding: 'utf8' });
execFile
import { execFile } from 'child_process';
// 直接执行可执行文件(不通过 shell)
execFile('node', ['--version'], (error, stdout) => {
console.log('Node version:', stdout);
});
fork
// parent.js
import { fork } from 'child_process';
const child = fork('./child.js');
// 发送消息给子进程
child.send({ type: 'start', data: [1, 2, 3] });
// 接收子进程消息
child.on('message', (message) => {
console.log('收到子进程消息:', message);
});
child.on('exit', (code) => {
console.log('子进程退出:', code);
});
// child.js
process.on('message', (message) => {
console.log('收到父进程消息:', message);
// 处理任务
const result = message.data.reduce((a, b) => a + b, 0);
// 发送结果给父进程
process.send({ type: 'result', data: result });
});
spawn vs exec vs fork
| 方法 | 用途 | stdout | 适用场景 |
|---|---|---|---|
| spawn | 执行命令 | 流式输出 | 大量输出、长时间运行 |
| exec | 执行命令 | 缓冲输出 | 小量输出、简单命令 |
| execFile | 执行文件 | 缓冲输出 | 直接执行,更安全 |
| fork | 执行 Node.js | IPC 通信 | Node.js 脚本、进程通信 |
cluster
cluster 模块用于创建多个工作进程,共享同一端口,实现负载均衡。
import cluster from 'cluster';
import { createServer } from 'http';
import { cpus } from 'os';
const numCPUs = cpus().length;
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`);
// 创建工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// 工作进程退出时重启
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 退出`);
cluster.fork(); // 重新创建
});
} else {
// 工作进程共享同一 TCP 连接
createServer((req, res) => {
res.writeHead(200);
res.end(`来自进程 ${process.pid}\n`);
}).listen(3000);
console.log(`工作进程 ${process.pid} 已启动`);
}
进程间通信
if (cluster.isPrimary) {
const worker = cluster.fork();
// 发送消息给工作进程
worker.send({ type: 'config', data: { port: 3000 } });
// 接收工作进程消息
worker.on('message', (message) => {
console.log('收到工作进程消息:', message);
});
} else {
// 接收主进程消息
process.on('message', (message) => {
console.log('收到主进程消息:', message);
// 发送消息给主进程
process.send({ type: 'ready', pid: process.pid });
});
}
负载均衡策略
// 设置调度策略(在 fork 前设置)
cluster.schedulingPolicy = cluster.SCHED_RR; // 轮询(默认)
// 或
cluster.schedulingPolicy = cluster.SCHED_NONE; // 由操作系统决定
worker_threads
worker_threads 用于创建 JavaScript 工作线程,共享内存,适合 CPU 密集型任务。
基本用法
// main.ts
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
if (isMainThread) {
// 主线程
const worker = new Worker(__filename, {
workerData: { n: 40 }
});
worker.on('message', (result) => {
console.log('计算结果:', result);
});
worker.on('error', (err) => {
console.error('Worker 错误:', err);
});
worker.on('exit', (code) => {
console.log('Worker 退出:', code);
});
} else {
// 工作线程
const { n } = workerData;
function fibonacci(n: number): number {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
const result = fibonacci(n);
parentPort?.postMessage(result);
}
独立 Worker 文件
// main.ts
import { Worker } from 'worker_threads';
import { fileURLToPath } from 'url';
import { dirname, join } from 'path';
const __dirname = dirname(fileURLToPath(import.meta.url));
const worker = new Worker(join(__dirname, 'worker.ts'), {
workerData: { n: 40 }
});
worker.on('message', (result) => {
console.log('结果:', result);
});
// worker.ts
import { parentPort, workerData } from 'worker_threads';
const { n } = workerData;
// ... 计算
parentPort?.postMessage(result);
SharedArrayBuffer(共享内存)
import { Worker, isMainThread } from 'worker_threads';
if (isMainThread) {
// 创建共享内存
const sharedBuffer = new SharedArrayBuffer(4);
const sharedArray = new Int32Array(sharedBuffer);
sharedArray[0] = 0;
const worker = new Worker(__filename, {
workerData: { sharedBuffer }
});
// 修改共享内存
setTimeout(() => {
Atomics.add(sharedArray, 0, 10);
console.log('主线程: 加了 10');
}, 100);
} else {
const { sharedBuffer } = workerData;
const sharedArray = new Int32Array(sharedBuffer);
// 原子操作
Atomics.add(sharedArray, 0, 5);
console.log('Worker: 加了 5,当前值:', sharedArray[0]);
}
MessageChannel
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker('./worker.js');
// 创建通信通道
const { port1, port2 } = new MessageChannel();
// 将 port2 发送给 worker
worker.postMessage({ port: port2 }, [port2]);
// 通过 port1 通信
port1.on('message', (message) => {
console.log('收到:', message);
});
port1.postMessage('Hello from main');
对比
| 特性 | child_process | cluster | worker_threads |
|---|---|---|---|
| 隔离级别 | 进程 | 进程 | 线程 |
| 内存 | 独立 | 独立 | 可共享 |
| 通信 | IPC | IPC | postMessage/SharedArrayBuffer |
| 开销 | 高 | 高 | 低 |
| 适用场景 | 执行外部命令 | HTTP 服务负载均衡 | CPU 密集型计算 |
| 稳定性 | 崩溃不影响主进程 | 崩溃不影响主进程 | 崩溃可能影响主进程 |
常见面试问题
Q1: Node.js 如何利用多核 CPU?
答案:
// 方式一:cluster(HTTP 服务)
import cluster from 'cluster';
import { cpus } from 'os';
if (cluster.isPrimary) {
for (let i = 0; i < cpus().length; i++) {
cluster.fork();
}
} else {
// 启动 HTTP 服务
}
// 方式二:worker_threads(CPU 密集型)
import { Worker } from 'worker_threads';
const workers = cpus().map(() => new Worker('./task.js'));
// 方式三:child_process
import { fork } from 'child_process';
const children = cpus().map(() => fork('./task.js'));
// 方式四:PM2
// pm2 start app.js -i max
Q2: cluster 的工作原理是什么?
答案:
cluster 通过主进程监听端口,将连接分发给工作进程:
- 端口共享:主进程监听端口
- 连接分发:新连接到达时,主进程选择一个工作进程处理
- 负载均衡:默认使用轮询(Round-Robin)策略
// 简化原理
if (cluster.isPrimary) {
const server = net.createServer();
server.listen(3000);
server.on('connection', (socket) => {
// 选择一个 worker
const worker = selectWorker();
// 将 socket 发送给 worker
worker.send('socket', socket);
});
}
Q3: worker_threads 和 Web Workers 有什么区别?
答案:
| 特性 | worker_threads | Web Workers |
|---|---|---|
| 环境 | Node.js | 浏览器 |
| 共享内存 | SharedArrayBuffer + Atomics | SharedArrayBuffer(需 COOP/COEP) |
| 可转移对象 | MessagePort、ArrayBuffer 等 | 类似 |
| require/import | ✅ 支持 | ❌ 需 importScripts |
| Node.js API | ✅ 可访问 | ❌ 无 |
// worker_threads
import { Worker } from 'worker_threads';
const worker = new Worker('./task.js');
// Web Workers
const worker = new Worker('/task.js');
worker.postMessage(data);
Q4: 如何实现进程间通信(IPC)?
答案:
// 方式一:child_process/cluster 的 IPC
// parent.js
const child = fork('./child.js');
child.send({ type: 'data', payload: [1, 2, 3] });
child.on('message', (msg) => console.log(msg));
// child.js
process.on('message', (msg) => {
process.send({ type: 'result', payload: 6 });
});
// 方式二:共享内存(worker_threads)
const shared = new SharedArrayBuffer(1024);
const worker = new Worker('./worker.js', {
workerData: { shared }
});
// 方式三:消息队列(Redis、RabbitMQ 等)
// 适用于独立进程
// 方式四:Socket
// 使用 TCP/Unix Socket 通信
Q5: cluster 和 worker_threads 应该如何选择?
答案:
使用 cluster:
- HTTP/TCP 服务器负载均衡
- 需要进程隔离(一个崩溃不影响其他)
- 与 PM2 配合管理
使用 worker_threads:
- CPU 密集型计算
- 需要共享内存
- 需要低开销线程
// HTTP 服务 → cluster
import cluster from 'cluster';
if (cluster.isPrimary) {
// fork workers
} else {
app.listen(3000);
}
// CPU 密集型 → worker_threads
import { Worker } from 'worker_threads';
app.get('/compute', async (req, res) => {
const worker = new Worker('./heavy-task.js');
worker.on('message', (result) => res.json(result));
});