本篇文章和大家探讨一下Node.js利用多个核心的方法--worker_threads模块提供的多线程模型,介绍一下Node.js多进程模型中实现共享内存的方法。
本篇文章和大家探讨一下Node.js利用多个核心的方法--worker_threads模块提供的多线程模型,介绍一下Node.js多进程模型中实现共享内存的方法。 Node.js 由于其单线程模型的设计,导致一个Node进程(的主线程)只能利用一个CPU核心,然而现在的机器基本上都是多核的,这造成了严重的性能浪费。通常来说,想要利用到多个核心一般有以下的方法:
从易用、代码入侵性、稳定性的角度来说,多进程模型通常是首要的选择。【推荐学习:《nodejs 教程》】 Node.js cluster 多进程模型存在的问题 在cluster模块提供的多进程模型中,每个Node进程都是一个独立且完整的应用进程,有自己的内存空间,其它进程无法访问。因此虽然在项目启动时,所有Worker进程具有一致的状态和行为,但在之后的运行中无法保证其状态维持一致。 例如,项目启动时有两个Worker进程,进程A和进程B,两个进程都声明了变量a=1。但之后项目接收到一个请求,Master进程将其分派给进程A来处理,这个请求将a的值变更为了2,那么此时进程A的内存空间中a=2,但是进程B的内存空间中a依旧是1。此时如果有个请求读取a的值,Master进程将这个请求分派给进程A和进程B时读取到的结果是不一致的,这就出现了一致性问题。 cluster模块在设计时并没有给出解决方案,而是要求Worker进程是无状态的,即程序员在写代码时不应该允许在处理请求时修改内存中的值,以此来保障所有Worker进程的一致性。然而在实践中总会有各种各样的情况需要写内存,比如记录用户的登录状态等,在许多企业的实践中,通常会把这些状态数据记录在外部,例如数据库、redis、消息队列、文件系统等,每次处理有状态请求时会读写外部存储空间。 这不失为一种有效的做法,然而这需要额外引入一个外部存储空间,同时还要自行处理多进程并发访问下的一致性问题,自行维护数据的生命周期(因为Node进程和维护在外部的数据并不是同步创建和销毁的),以及在高并发访问情况下的IO性能瓶颈(如果是存储在数据库等非内存环境中)。其实本质上来说,我们只是需要一个可供多个进程共享访问的空间罢了,并不需要持久化存储,这段空间的生命周期最好与Node进程强绑定,这样在使用时能省去不少麻烦。因此跨进程的共享内存就成了最适合在这种场景使用的方式。 Node.js 的共享内存 很遗憾Node本身并未提供共享内存的实现,因此我们可以看看npm仓库中第三方库的实现。这些库有些是通过C++插件扩充Node的函数实现的,有些是通过Node提供的IPC机制实现的,但很遗憾它们的实现都很简单,并未提供互斥访问、对象监听等功能,这使得使用者必须自己小心维护这段共享内存,否则就会导致时序问题。 转了一圈下来没找到我想要的。。。那就算了,我自己写一个。 共享内存的设计首先我们必须理清楚到底需要个什么样的共享内存,我是根据我自身的需求出发(为了在项目中用它来存储跨进程访问的状态数据),同时兼顾通用性,因此会首先考虑以下几点:
可以发现,其实我们并不需要操作系统层面的共享内存,只需要能够多个Node进程能访问同一个对象就行了,那么就可以在Node本身提供的机制上实现。可以使用Master进程的一段内存空间作为共享内存空间,Worker进程通过IPC将读写请求委托给Master进程,由Master进程进行读写,然后再通过IPC将结果返回给Worker进程。 为了让共享内存的使用方式在Master进程和Worker进程中一致,我们可以将对共享内存的操作抽离成一个接口,在Master进程和Worker进程中各自实现这个接口。类图如下图所示,用一个 可以使用 由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。 为了使用简单,我们可以将 代码实现读写控制与IPC通信 首先实现对外接口
// shared-memory.js class SharedMemory { constructor() { if (cluster.isMaster || cluster.isPrimary) { return new Manager(); } else { return new Worker(); } } }
我们通过
// manager.js const cluster = require('cluster'); class Manager { constructor() { this.__sharedMemory__ = { set(key, value) { this.memory[key] = value; }, get(key) { return this.memory[key]; }, remove(key) { delete this.memory[key]; }, memory: {}, }; // Listen the messages from worker processes. cluster.on('online', (worker) => { worker.on('message', (data) => { this.handle(data, worker); return false; }); }); } handle(data, target) { const args = data.value ? [data.key, data.value] : [data.key]; this[data.method](...args).then((value) => { const msg = { id: data.id, // workerId uuid: data.uuid, // communicationID value, }; target.send(msg); }); } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); resolve('OK'); }); } get(key) { return new Promise((resolve) => { resolve(this.__sharedMemory__.get(key)); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); resolve('OK'); }); } }
之后项目运行到某个地方的时候,如果要访问共享内存,就会调用
// worker.js const cluster = require('cluster'); const { v4: uuid4 } = require('uuid'); class Worker { constructor() { this.__getCallbacks__ = {}; process.on('message', (data) => { const callback = this.__getCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } delete this.__getCallbacks__[data.uuid]; }); } set(key, value) { return new Promise((resolve) => { this.handle('set', key, value, () => { resolve(); }); }); } get(key) { return new Promise((resolve) => { this.handle('get', key, null, (value) => { resolve(value); }); }); } remove(key) { return new Promise((resolve) => { this.handle('remove', key, null, () => { resolve(); }); }); } handle(method, key, value, callback) { const uuid = uuid4(); // 每次通信的uuid process.send({ id: cluster.worker.id, method, uuid, key, value, }); this.__getCallbacks__[uuid] = callback; } } 一次共享内存访问的完整流程是:调用 互斥访问 到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:
进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。 在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。 为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。 在 // manager.js const { v4: uuid4 } = require('uuid'); class Manager { constructor() { this.__sharedMemory__ = { ... locks: {}, lockRequestQueues: {}, }; } getLock(key) { return new Promise((resolve) => { this.__sharedMemory__.lockRequestQueues[key] = this.__sharedMemory__.lockRequestQueues[key] ?? []; this.__sharedMemory__.lockRequestQueues[key].push(resolve); this.handleLockRequest(key); }); } releaseLock(key, lockId) { return new Promise((resolve) => { if (lockId === this.__sharedMemory__.locks[key]) { delete this.__sharedMemory__.locks[key]; this.handleLockRequest(key); } resolve('OK'); }); } handleLockRequest(key) { return new Promise((resolve) => { if ( !this.__sharedMemory__.locks[key] && this.__sharedMemory__.lockRequestQueues[key]?.length > 0 ) { const callback = this.__sharedMemory__.lockRequestQueues[key].shift(); const lockId = uuid4(); this.__sharedMemory__.locks[key] = lockId; callback(lockId); } resolve(); }); } ... } 在 // worker.js class Worker { getLock(key) { return new Promise((resolve) => { this.handle('getLock', key, null, (value) => { resolve(value); }); }); } releaseLock(key, lockId) { return new Promise((resolve) => { this.handle('releaseLock', key, lockId, (value) => { resolve(value); }); }); } ... } 监听对象 有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的 为此,我们先在 // manager.js class Manager { constructor() { this.__sharedMemory__ = { ... listeners: {}, }; } handle(data, target) { if (data.method === 'listen') { this.listen(data.key, (value) => { const msg = { isNotified: true, id: data.id, uuid: data.uuid, value, }; target.send(msg); }); } else { ... } } notifyListener(key) { const listeners = this.__sharedMemory__.listeners[key]; if (listeners?.length > 0) { Promise.all( listeners.map( (callback) => new Promise((resolve) => { callback(this.__sharedMemory__.get(key)); resolve(); }) ) ); } } set(key, value) { return new Promise((resolve) => { this.__sharedMemory__.set(key, value); this.notifyListener(key); resolve('OK'); }); } remove(key) { return new Promise((resolve) => { this.__sharedMemory__.remove(key); this.notifyListener(key); resolve('OK'); }); } listen(key, callback) { if (typeof callback === 'function') { this.__sharedMemory__.listeners[key] = this.__sharedMemory__.listeners[key] ?? []; this.__sharedMemory__.listeners[key].push(callback); } else { throw new Error('a listener must have a callback.'); } } ... } 在 // worker.js class Worker { constructor() { ... this.__getListenerCallbacks__ = {}; process.on('message', (data) => { if (data.isNotified) { const callback = this.__getListenerCallbacks__[data.uuid]; if (callback && typeof callback === 'function') { callback(data.value); } } else { ... } }); } handle(method, key, value, callback) { ... if (method === 'listen') { this.__getListenerCallbacks__[uuid] = callback; } else { this.__getCallbacks__[uuid] = callback; } } listen(key, callback) { if (typeof callback === 'function') { this.handle('listen', key, null, callback); } else { throw new Error('a listener must have a callback.'); } } ... } LRU缓存 有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在 // manager.js const LRU = require('lru-cache'); class Manager { constructor() { ... this.defaultLRUOptions = { max: 10000, maxAge: 1000 * 60 * 5 }; this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions); } getLRU(key) { return new Promise((resolve) => { resolve(this.__sharedLRUMemory__.get(key)); }); } setLRU(key, value) { return new Promise((resolve) => { this.__sharedLRUMemory__.set(key, value); resolve('OK'); }); } removeLRU(key) { return new Promise((resolve) => { this.__sharedLRUMemory__.del(key); resolve('OK'); }); } ... }
// worker.js class Worker { getLRU(key) { return new Promise((resolve) => { this.handle('getLRU', key, null, (value) => { resolve(value); }); }); } setLRU(key, value) { return new Promise((resolve) => { this.handle('setLRU', key, value, () => { resolve(); }); }); } removeLRU(key) { return new Promise((resolve) => { this.handle('removeLRU', key, null, () => { resolve(); }); }); } ... } 共享内存的使用方式目前共享内存的实现已发到npm仓库(文档和源代码在Github仓库,欢迎pull request和报bug),可以直接通过npm安装: npm i cluster-shared-memory 下面的示例包含了基本使用方法: const cluster = require('cluster'); // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象 require('cluster-shared-memory'); if (cluster.isMaster) { // 在 master 进程中 fork 子进程 for (let i = 0; i < 2; i++) { cluster.fork(); } } else { const sharedMemoryController = require('./src/shared-memory'); const obj = { name: 'Tom', age: 10, }; // 写对象 await sharedMemoryController.set('myObj', obj); // 读对象 const myObj = await sharedMemoryController.get('myObj'); // 互斥访问对象,首先获得对象的锁 const lockId = await sharedMemoryController.getLock('myObj'); const newObj = await sharedMemoryController.get('myObj'); newObj.age = newObj.age + 1; await sharedMemoryController.set('myObj', newObj); // 操作完之后释放锁 await sharedMemoryController.releaseLock('requestTimes', lockId); // 或者使用 mutex 函数自动获取和释放锁 await sharedMemoryController.mutex('myObj', async () => { const newObjM = await sharedMemoryController.get('myObj'); newObjM.age = newObjM.age + 1; await sharedMemoryController.set('myObj', newObjM); }); // 监听对象 sharedMemoryController.listen('myObj', (value) => { console.log(`myObj: ${value}`); }); //写LRU缓存 await sharedMemoryController.setLRU('cacheItem', {user: 'Tom'}); // 读对象 const cacheItem = await sharedMemoryController.getLRU('cacheItem'); } 缺点这种实现目前尚有几个缺点:
更多编程相关知识,请访问:编程视频!! 以上就是浅谈Node.js多进程模型中如何实现共享内存(代码详解)的详细内容,更多请关注模板之家(www.mb5.com.cn)其它相关文章! |