Skip to content

Commit

Permalink
src: make OnWorkComplete and OnExecute override-able
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Nov 6, 2019
1 parent 295e560 commit e1e5e6f
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 80 deletions.
24 changes: 24 additions & 0 deletions doc/async_worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,30 @@ class was created, passing in the error as the first parameter.
virtual void Napi::AsyncWorker::OnError(const Napi::Error& e);
```

### OnWorkComplete

This method is invoked after the work has completed on JavaScript thread.
The default implementation of this method checks the status of the work and
try to dispatch the result to `Napi::AsyncWorker::OnOk` or `Napi::AsyncWorker::Error`
if the work has committed an error. If the work was cancelled, neither of
`Napi::AsyncWorker::OnOk` nor `Napi::AsyncWorker::Error` will be invoked.
After the result dispatched, the default implementation will call into
`Napi::AsyncWorker::Destroy` if `SuppressDestruct()` was not called.

```cpp
virtual void OnWorkComplete(Napi::Env env, napi_status status);
```
### OnExecute
This method is invoked immediately on the work thread on scheduled.
The default implementation of this method just call the `Napi::AsyncWorker::Execute`
and handles exceptions if cpp exceptions was enabled.
```cpp
virtual void OnExecute(Napi::Env env);
```

### Destroy

This method is invoked when the instance must be deallocated. If
Expand Down
128 changes: 80 additions & 48 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3699,8 +3699,8 @@ inline AsyncWorker::AsyncWorker(const Object& receiver,
_env, resource_name, NAPI_AUTO_LENGTH, &resource_id);
NAPI_THROW_IF_FAILED_VOID(_env, status);

status = napi_create_async_work(_env, resource, resource_id, OnExecute,
OnWorkComplete, this, &_work);
status = napi_create_async_work(_env, resource, resource_id, OnAsyncWorkExecute,
OnAsyncWorkComplete, this, &_work);
NAPI_THROW_IF_FAILED_VOID(_env, status);
}

Expand All @@ -3725,8 +3725,8 @@ inline AsyncWorker::AsyncWorker(Napi::Env env,
_env, resource_name, NAPI_AUTO_LENGTH, &resource_id);
NAPI_THROW_IF_FAILED_VOID(_env, status);

status = napi_create_async_work(_env, resource, resource_id, OnExecute,
OnWorkComplete, this, &_work);
status = napi_create_async_work(_env, resource, resource_id, OnAsyncWorkExecute,
OnAsyncWorkComplete, this, &_work);
NAPI_THROW_IF_FAILED_VOID(_env, status);
}

Expand Down Expand Up @@ -3813,40 +3813,51 @@ inline void AsyncWorker::SetError(const std::string& error) {
inline std::vector<napi_value> AsyncWorker::GetResult(Napi::Env /*env*/) {
return {};
}
// The OnAsyncWorkExecute method receives an napi_env argument. However, do NOT
// use it within this method, as it does not run on the main thread and must
// not run any method that would cause JavaScript to run. In practice, this
// means that almost any use of napi_env will be incorrect.
inline void OnAsyncWorkExecute(napi_env env, void* asyncworker) {
AsyncWorker* self = static_cast<AsyncWorker*>(asyncworker);
self->OnExecute(env);
}
// The OnExecute method receives an napi_env argument. However, do NOT
// use it within this method, as it does not run on the main thread and must
// not run any method that would cause JavaScript to run. In practice, this
// means that almost any use of napi_env will be incorrect.
inline void AsyncWorker::OnExecute(napi_env /*DO_NOT_USE*/, void* this_pointer) {
AsyncWorker* self = static_cast<AsyncWorker*>(this_pointer);
inline void AsyncWorker::OnExecute(Napi::Env /*DO_NOT_USE*/) {
#ifdef NAPI_CPP_EXCEPTIONS
try {
self->Execute();
this->Execute();
} catch (const std::exception& e) {
self->SetError(e.what());
this->SetError(e.what());
}
#else // NAPI_CPP_EXCEPTIONS
self->Execute();
this->Execute();
#endif // NAPI_CPP_EXCEPTIONS
}

inline void AsyncWorker::OnWorkComplete(
napi_env /*env*/, napi_status status, void* this_pointer) {
AsyncWorker* self = static_cast<AsyncWorker*>(this_pointer);
inline void OnAsyncWorkComplete(napi_env env,
napi_status status,
void* asyncworker) {
AsyncWorker* self = static_cast<AsyncWorker*>(asyncworker);
self->OnWorkComplete(env, status);
}
inline void AsyncWorker::OnWorkComplete(Napi::Env /*env*/, napi_status status) {
if (status != napi_cancelled) {
HandleScope scope(self->_env);
HandleScope scope(this->_env);
details::WrapCallback([&] {
if (self->_error.size() == 0) {
self->OnOK();
if (this->_error.size() == 0) {
this->OnOK();
}
else {
self->OnError(Error::New(self->_env, self->_error));
this->OnError(Error::New(this->_env, this->_error));
}
return nullptr;
});
}
if (!self->_suppress_destruct) {
self->Destroy();
if (!this->_suppress_destruct) {
this->Destroy();
}
}

Expand Down Expand Up @@ -4172,9 +4183,38 @@ inline void ThreadSafeFunction::CallJS(napi_env env,
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Worker class
// Async Progress Worker Base class
////////////////////////////////////////////////////////////////////////////////
inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncWorker(receiver, callback, resource_name, resource) {
_tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1);
}

#if NAPI_VERSION > 4
inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(Napi::Env env,
const char* resource_name,
const Object& resource)
: AsyncWorker(env, resource_name, resource) {
// TODO: Once the changes to make the callback optional for threadsafe
// functions are no longer optional we can remove the dummy Function here.
Function callback;
_tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1);
}
#endif

inline void OnAsyncWorkProgress(Napi::Env /* env */,
Napi::Function /* jsCallback */,
void* asyncworker) {
AsyncProgressWorkerBase* asyncprogressworker = static_cast<AsyncProgressWorkerBase*>(asyncworker);
asyncprogressworker->OnWorkProgress();
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Worker class
////////////////////////////////////////////////////////////////////////////////
template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(const Function& callback)
: AsyncProgressWorker(callback, "generic") {
Expand All @@ -4198,14 +4238,14 @@ inline AsyncProgressWorker<T>::AsyncProgressWorker(const Function& callback,

template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(const Object& receiver,
const Function& callback)
const Function& callback)
: AsyncProgressWorker(receiver, callback, "generic") {
}

template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(const Object& receiver,
const Function& callback,
const char* resource_name)
const Function& callback,
const char* resource_name)
: AsyncProgressWorker(receiver,
callback,
resource_name,
Expand All @@ -4217,10 +4257,9 @@ inline AsyncProgressWorker<T>::AsyncProgressWorker(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncWorker(receiver, callback, resource_name, resource),
: AsyncProgressWorkerBase(receiver, callback, resource_name, resource),
_asyncdata(nullptr),
_asyncsize(0) {
_tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1);
}

#if NAPI_VERSION > 4
Expand All @@ -4231,35 +4270,31 @@ inline AsyncProgressWorker<T>::AsyncProgressWorker(Napi::Env env)

template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(Napi::Env env,
const char* resource_name)
const char* resource_name)
: AsyncProgressWorker(env, resource_name, Object::New(env)) {
}

template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(Napi::Env env,
const char* resource_name,
const Object& resource)
: AsyncWorker(env, resource_name, resource),
const char* resource_name,
const Object& resource)
: AsyncProgressWorkerBase(env, resource_name, resource),
_asyncdata(nullptr),
_asyncsize(0) {
// TODO: Once the changes to make the callback optional for threadsafe
// functions are no longer optional we can remove the dummy Function here.
Function callback;
_tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1);
}
#endif

template<class T>
inline AsyncProgressWorker<T>::~AsyncProgressWorker() {
// Abort pending tsfn call.
// Don't send progress events after we've already completed.
_tsfn.Abort();
this->_tsfn.Abort();
{
std::lock_guard<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(this->_mutex);
_asyncdata = nullptr;
_asyncsize = 0;
}
_tsfn.Release();
this->_tsfn.Release();
}

template<class T>
Expand All @@ -4269,20 +4304,18 @@ inline void AsyncProgressWorker<T>::Execute() {
}

template<class T>
inline void AsyncProgressWorker<T>::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) {
AsyncProgressWorker* self = static_cast<AsyncProgressWorker*>(_data);

inline void AsyncProgressWorker<T>::OnWorkProgress() {
T* data;
size_t size;
{
std::lock_guard<std::mutex> lock(self->_mutex);
data = self->_asyncdata;
size = self->_asyncsize;
self->_asyncdata = nullptr;
self->_asyncsize = 0;
std::lock_guard<std::mutex> lock(this->_mutex);
data = this->_asyncdata;
size = this->_asyncsize;
this->_asyncdata = nullptr;
this->_asyncsize = 0;
}

self->OnProgress(data, size);
this->OnProgress(data, size);
delete[] data;
}

Expand All @@ -4293,19 +4326,19 @@ inline void AsyncProgressWorker<T>::SendProgress_(const T* data, size_t count) {

T* old_data;
{
std::lock_guard<std::mutex> lock(_mutex);
std::lock_guard<std::mutex> lock(this->_mutex);
old_data = _asyncdata;
_asyncdata = new_data;
_asyncsize = count;
}
_tsfn.NonBlockingCall(this, WorkProgress_);
this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress);

delete[] old_data;
}

template<class T>
inline void AsyncProgressWorker<T>::Signal() const {
_tsfn.NonBlockingCall(this, WorkProgress_);
this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress);
}

template<class T>
Expand All @@ -4317,7 +4350,6 @@ template<class T>
inline void AsyncProgressWorker<T>::ExecutionProgress::Send(const T* data, size_t count) const {
_worker->SendProgress_(data, count);
}

#endif

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit e1e5e6f

Please sign in to comment.