Skip to content

基于nodejs worker_threads的线程池。耗时操作或nodejs没有提供异步模式的api(例如解密、同步的文件api)都可以在线程池中执行,业务代码只需要返回一个Promise或async函数给线程池库,至于业务逻辑做什么操作,其实都可以,比如setTimeout,异步操作,async await等

License

Notifications You must be signed in to change notification settings

theanarkh/nodejs-threadpool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

nodejs-threadpool

基于nodejs worker_threads的线程池。耗时操作或nodejs没有提供异步模式的api(例如解密、同步的文件api)都可以在线程池中执行,业务代码只需要返回一个Promise或async函数给线程池库,至于业务逻辑做什么操作,其实都可以,比如setTimeout,异步操作,async await等(设计文档https://zhuanlan.zhihu.com/p/266656697)。

支持文件和字符串模式,需要导出一个函数。

1 提供的线程池类型

// 同步处理任务
class ThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, sync: true});
    }
}
// cpu型的线程池,线程数和cpu核数一样,不支持动态扩容
class CPUThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, coreThreads: cores, expansion: false});
    }
}
// 只有一个线程的线程池,不支持动态扩容
class SingleThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, coreThreads: 1, expansion: false });
    }
}
// 固定线程数的线程池,不支持动态扩容线程数
class FixedThreadPool extends ThreadPool {
    constructor(options) {
        super({ ...options, expansion: false });
    }
}

const defaultThreadPool = new ThreadPool();
const defaultCpuThreadPool = new CPUThreadPool();
const defaultFixedThreadPool = new FixedThreadPool();
const defaultSingleThreadPool = new SingleThreadPool();
module.exports = {
    ThreadPool,
    CPUThreadPool,
    FixedThreadPool,
    SingleThreadPool,
    defaultThreadPool, 
    defaultCpuThreadPool,
    defaultFixedThreadPool,
    defaultSingleThreadPool,
}

2 用户可以自定义线程池类型和参数

1 coreThreads:核心线程数,默认102 maxThreads:最大线程数,默认50,只在支持动态扩容的情况下,该参数有效,否则该参数等于核心线程数
3 timeout:任务执行的超时时间,全局配置,可针对单个任务设置
4 discardPolicy:任务超过阈值时的处理策略,策略如下
 	// 报错
    ABORT: 1,
    // 在主线程里执行
    CALLER_RUN: 2,
    // 丢弃最老的的任务
    DISCARD_OLDEST: 3,
    // 丢弃
    DISCARD: 4,
    // 不丢弃
    NOT_DISCARD: 5,
5 preCreate:是否预创建线程池
6 maxIdleTime:线程空闲多久后自动退出
7 maxWork:线程池最大任务数 
8 expansion:是否支持动态扩容线程,阈值是最大线程数

3 线程池给用户侧返回的是UserWork类的对象 支持的api

设置任务的超时时间
setTimeout
// 取消之前的超时时间设置
clearTimeout
// 取消任务的执行
cancel

UserWork类继承EventEmitter 支持的事件有

// 任务超时
timeout
// 任务执行完成,执行结果由用户的业务代码决定,在回调里可以拿到
done
// 任务执行出错,具体原因在回调里可以拿到
error
// 任务过载,当前任务被取消

4 使用 例子1 index.js

const { defaultThreadPool } = require('nodejs-thread-pool').threadPool;
const path = require('path');
async function test() {
	const worker = await defaultThreadPool.submit(path.resolve(__dirname, 'event.js'));
	worker.on('done', function() {
        console.log(...arguments)
    })

    worker.on('error', function() {
        console.log(...arguments)
    })
}
test()

event.js

module.exports = async function() {
    return await new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve({type: 'async event'});
            console.log(1)
        },3000)
    })
} 

例子2

const { defaultThreadPool } = require('nodejs-thread-pool').threadPool;
const path = require('path');
async function test() {
    const work1 = await defaultThreadPool.submit('async function({a, b}) { return a + b; }', {a: 1, b: 1});
    work1.on('done',  function() {
        console.log(...arguments);
    })
    const work = await defaultThreadPool.submit(`async function(params) { return await new Promise((resolve) => {console.log(params); setTimeout(() => {resolve(1)}, 3000)})  }`, {name: 22}); 
    work.on('done', function() {
        console.log(...arguments);
    });
}

test()

About

基于nodejs worker_threads的线程池。耗时操作或nodejs没有提供异步模式的api(例如解密、同步的文件api)都可以在线程池中执行,业务代码只需要返回一个Promise或async函数给线程池库,至于业务逻辑做什么操作,其实都可以,比如setTimeout,异步操作,async await等

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published