Skip to content

Commit 3c3d321

Browse files
committed
Implement AsyncProgressWorker
1 parent 6192e70 commit 3c3d321

File tree

8 files changed

+668
-0
lines changed

8 files changed

+668
-0
lines changed

doc/async_progress_worker.md

Lines changed: 413 additions & 0 deletions
Large diffs are not rendered by default.

napi-inl.h

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
// Note: Do not include this file directly! Include "napi.h" instead.
1111

12+
#include <algorithm>
1213
#include <cstring>
14+
#include <mutex>
1315
#include <type_traits>
1416

1517
namespace Napi {
@@ -4112,6 +4114,91 @@ inline void ThreadSafeFunction::CallJS(napi_env env,
41124114
Function(env, jsCallback).Call({});
41134115
}
41144116
}
4117+
4118+
////////////////////////////////////////////////////////////////////////////////
4119+
// Async Progress Worker class
4120+
////////////////////////////////////////////////////////////////////////////////
4121+
4122+
template<class T>
4123+
inline AsyncProgressWorker<T>::AsyncProgressWorker(const Function& callback,
4124+
const char* resource_name,
4125+
const Object& resource)
4126+
: AsyncWorker(callback, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) {
4127+
_tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1);
4128+
}
4129+
4130+
template<class T>
4131+
inline AsyncProgressWorker<T>::~AsyncProgressWorker() {
4132+
// Abort pending tsfn call.
4133+
// Don't send progress events after we've already completed.
4134+
_tsfn.Abort();
4135+
{
4136+
std::lock_guard<std::mutex> lock(_mutex);
4137+
_asyncdata = nullptr;
4138+
_asyncsize = 0;
4139+
}
4140+
_tsfn.Release();
4141+
}
4142+
4143+
template<class T>
4144+
inline void AsyncProgressWorker<T>::Execute() {
4145+
ExecutionProgress progress(this);
4146+
Execute(progress);
4147+
}
4148+
4149+
template<class T>
4150+
inline void AsyncProgressWorker<T>::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) {
4151+
AsyncProgressWorker* self = static_cast<AsyncProgressWorker*>(_data);
4152+
4153+
T* data;
4154+
size_t size;
4155+
{
4156+
std::lock_guard<std::mutex> lock(self->_mutex);
4157+
data = self->_asyncdata;
4158+
size = self->_asyncsize;
4159+
self->_asyncdata = nullptr;
4160+
self->_asyncsize = 0;
4161+
}
4162+
4163+
self->OnProgress(data, size);
4164+
delete[] data;
4165+
}
4166+
4167+
template<class T>
4168+
inline void AsyncProgressWorker<T>::SendProgress_(const T* data, size_t count) {
4169+
T* new_data = new T[count];
4170+
{
4171+
T* it = new_data;
4172+
std::copy(data, data + count, it);
4173+
}
4174+
4175+
T* old_data;
4176+
{
4177+
std::lock_guard<std::mutex> lock(_mutex);
4178+
old_data = _asyncdata;
4179+
_asyncdata = new_data;
4180+
_asyncsize = count;
4181+
}
4182+
_tsfn.NonBlockingCall(this, WorkProgress_);
4183+
4184+
delete[] old_data;
4185+
}
4186+
4187+
template<class T>
4188+
inline void AsyncProgressWorker<T>::Signal() const {
4189+
_tsfn.NonBlockingCall(this, WorkProgress_);
4190+
}
4191+
4192+
template<class T>
4193+
inline void AsyncProgressWorker<T>::ExecutionProgress::Signal() const {
4194+
_that->Signal();
4195+
}
4196+
4197+
template<class T>
4198+
inline void AsyncProgressWorker<T>::ExecutionProgress::Send(const T* data, size_t count) const {
4199+
_that->SendProgress_(data, count);
4200+
}
4201+
41154202
#endif
41164203

41174204
////////////////////////////////////////////////////////////////////////////////

napi.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <functional>
66
#include <initializer_list>
77
#include <memory>
8+
#include <mutex>
89
#include <string>
910
#include <vector>
1011

@@ -2061,6 +2062,42 @@ namespace Napi {
20612062
std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
20622063
Deleter _d;
20632064
};
2065+
2066+
template<class T>
2067+
class AsyncProgressWorker : public AsyncWorker {
2068+
public:
2069+
virtual ~AsyncProgressWorker();
2070+
2071+
class ExecutionProgress {
2072+
friend class AsyncProgressWorker;
2073+
public:
2074+
void Signal() const;
2075+
void Send(const T* data, size_t count) const;
2076+
private:
2077+
explicit ExecutionProgress(AsyncProgressWorker* that) : _that(that) {}
2078+
AsyncProgressWorker* const _that;
2079+
};
2080+
2081+
protected:
2082+
explicit AsyncProgressWorker(const Function& callback,
2083+
const char* resource_name,
2084+
const Object& resource);
2085+
2086+
virtual void Execute(const ExecutionProgress& progress) = 0;
2087+
virtual void OnProgress(const T* data, size_t count) = 0;
2088+
2089+
private:
2090+
static void WorkProgress_(Napi::Env env, Napi::Function jsCallback, void* data);
2091+
2092+
void Execute() override;
2093+
void Signal() const;
2094+
void SendProgress_(const T* data, size_t count);
2095+
2096+
std::mutex _mutex;
2097+
T* _asyncdata;
2098+
size_t _asyncsize;
2099+
ThreadSafeFunction _tsfn;
2100+
};
20642101
#endif
20652102

20662103
// Memory management.

test/asyncprogressworker.cc

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#include "napi.h"
2+
3+
#include <chrono>
4+
#include <condition_variable>
5+
#include <mutex>
6+
#include <thread>
7+
8+
#if (NAPI_VERSION > 3)
9+
10+
using namespace Napi;
11+
12+
namespace {
13+
14+
struct ProgressData {
15+
size_t progress;
16+
};
17+
18+
class TestWorker : public AsyncProgressWorker<ProgressData> {
19+
public:
20+
static void DoWork(const CallbackInfo& info) {
21+
bool succeed = info[0].As<Boolean>();
22+
Object resource = info[1].As<Object>();
23+
Function cb = info[2].As<Function>();
24+
Value data = info[3];
25+
26+
TestWorker* worker = new TestWorker(cb, "TestResource", resource);
27+
worker->Receiver().Set("data", data);
28+
worker->_succeed = succeed;
29+
worker->Queue();
30+
}
31+
32+
protected:
33+
void Execute(const ExecutionProgress& progress) override {
34+
ProgressData data{0};
35+
std::unique_lock<std::mutex> lock(_cvm);
36+
data.progress = 0;
37+
progress.Send(&data, 1);
38+
_cv.wait(lock);
39+
40+
data.progress = 50;
41+
progress.Send(&data, 1);
42+
_cv.wait(lock);
43+
if (!_succeed) {
44+
SetError("test error");
45+
}
46+
data.progress = 75;
47+
progress.Send(&data, 1);
48+
_cv.wait(lock);
49+
}
50+
51+
void OnProgress(const ProgressData* data, size_t /* count */) override {
52+
FunctionReference& callback = Callback();
53+
Napi::Env env = Env();
54+
if (!callback.IsEmpty()) {
55+
Value err = env.Undefined();
56+
Number progress = Number::New(env, data->progress);
57+
callback.Call(Receiver().Value(), { err, progress });
58+
}
59+
_cv.notify_one();
60+
}
61+
62+
std::vector<napi_value> GetResult(Napi::Env env) override {
63+
Value err = env.Undefined();
64+
Number progress = Number::New(env, 100);
65+
return {err, progress};
66+
}
67+
68+
private:
69+
TestWorker(Function cb, const char* resource_name, const Object& resource)
70+
: AsyncProgressWorker(cb, resource_name, resource) {}
71+
std::condition_variable _cv;
72+
std::mutex _cvm;
73+
bool _succeed;
74+
};
75+
76+
}
77+
78+
Object InitAsyncProgressWorker(Env env) {
79+
Object exports = Object::New(env);
80+
exports["doWork"] = Function::New(env, TestWorker::DoWork);
81+
return exports;
82+
}
83+
84+
#endif

test/asyncprogressworker.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
const buildType = process.config.target_defaults.default_configuration;
3+
const common = require('./common')
4+
const assert = require('assert');
5+
6+
test(require(`./build/${buildType}/binding.node`));
7+
test(require(`./build/${buildType}/binding_noexcept.node`));
8+
9+
function test(binding) {
10+
success(binding);
11+
fail(binding);
12+
return;
13+
}
14+
15+
function success(binding) {
16+
const expected = [0, 50, 75, 100];
17+
const actual = [];
18+
binding.asyncprogressworker.doWork(true, {}, common.mustCall((err, _progress) => {
19+
if (err) {
20+
assert.fail(err);
21+
return;
22+
}
23+
actual.push(_progress);
24+
if (actual.length === expected.length) {
25+
assert.deepEqual(actual, expected);
26+
}
27+
}, expected.length));
28+
}
29+
30+
function fail(binding) {
31+
let err = undefined
32+
binding.asyncprogressworker.doWork(false, {}, (_err) => {
33+
err = _err;
34+
});
35+
process.on('exit', () => {
36+
assert.throws(() => { throw err }, /test error/)
37+
});
38+
}

test/binding.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ using namespace Napi;
55

66
Object InitArrayBuffer(Env env);
77
Object InitAsyncContext(Env env);
8+
#if (NAPI_VERSION > 3)
9+
Object InitAsyncProgressWorker(Env env);
10+
#endif
811
Object InitAsyncWorker(Env env);
912
Object InitPersistentAsyncWorker(Env env);
1013
Object InitBasicTypesArray(Env env);
@@ -49,6 +52,9 @@ Object InitThunkingManual(Env env);
4952
Object Init(Env env, Object exports) {
5053
exports.Set("arraybuffer", InitArrayBuffer(env));
5154
exports.Set("asynccontext", InitAsyncContext(env));
55+
#if (NAPI_VERSION > 3)
56+
exports.Set("asyncprogressworker", InitAsyncProgressWorker(env));
57+
#endif
5258
exports.Set("asyncworker", InitAsyncWorker(env));
5359
exports.Set("persistentasyncworker", InitPersistentAsyncWorker(env));
5460
exports.Set("basic_types_array", InitBasicTypesArray(env));

test/binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
'sources': [
88
'arraybuffer.cc',
99
'asynccontext.cc',
10+
'asyncprogressworker.cc',
1011
'asyncworker.cc',
1112
'asyncworker-persistent.cc',
1213
'basic_types/array.cc',

test/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ process.config.target_defaults.default_configuration =
1010
let testModules = [
1111
'arraybuffer',
1212
'asynccontext',
13+
'asyncprogressworker',
1314
'asyncworker',
1415
'asyncworker-nocallback',
1516
'asyncworker-persistent',
@@ -66,6 +67,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
6667

6768
if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
6869
(process.env.npm_config_NAPI_VERSION < 4)) {
70+
testModules.splice(testModules.indexOf('asyncprogressworker'), 1);
6971
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1);
7072
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1);
7173
}

0 commit comments

Comments
 (0)