From 5908617622b04d6e3c391d1967ac68ee8d7dd88c Mon Sep 17 00:00:00 2001 From: legendecas Date: Fri, 11 Oct 2019 19:49:16 +0800 Subject: [PATCH] Implement AsyncProgressQueueWorker --- napi-inl.h | 260 ++++++++++++++++++++++++++----- napi.h | 124 ++++++++++++--- test/asyncprogressqueueworker.cc | 68 ++++++++ test/asyncprogressqueueworker.js | 42 +++++ test/binding.cc | 2 + test/binding.gyp | 1 + test/index.js | 2 + 7 files changed, 438 insertions(+), 61 deletions(-) create mode 100644 test/asyncprogressqueueworker.cc create mode 100644 test/asyncprogressqueueworker.js diff --git a/napi-inl.h b/napi-inl.h index 16fe3b3f2..b3615a9ff 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -3699,8 +3699,8 @@ inline AsyncWorker::AsyncWorker(const Object& receiver, _env, resource_name, NAPI_AUTO_LENGTH, &resource_id); NAPI_THROW_IF_FAILED_VOID(_env, status); - status = napi_create_async_work(_env, resource, resource_id, OnExecute, - OnWorkComplete, this, &_work); + status = napi_create_async_work(_env, resource, resource_id, OnAsyncWorkExecute, + OnAsyncWorkComplete, this, &_work); NAPI_THROW_IF_FAILED_VOID(_env, status); } @@ -3725,8 +3725,8 @@ inline AsyncWorker::AsyncWorker(Napi::Env env, _env, resource_name, NAPI_AUTO_LENGTH, &resource_id); NAPI_THROW_IF_FAILED_VOID(_env, status); - status = napi_create_async_work(_env, resource, resource_id, OnExecute, - OnWorkComplete, this, &_work); + status = napi_create_async_work(_env, resource, resource_id, OnAsyncWorkExecute, + OnAsyncWorkComplete, this, &_work); NAPI_THROW_IF_FAILED_VOID(_env, status); } @@ -3813,40 +3813,51 @@ inline void AsyncWorker::SetError(const std::string& error) { inline std::vector AsyncWorker::GetResult(Napi::Env /*env*/) { return {}; } +// The OnAsyncWorkExecute method receives an napi_env argument. However, do NOT +// use it within this method, as it does not run on the main thread and must +// not run any method that would cause JavaScript to run. In practice, this +// means that almost any use of napi_env will be incorrect. +inline void OnAsyncWorkExecute(napi_env env, void* asyncworker) { + AsyncWorker* self = static_cast(asyncworker); + self->OnExecute(env); +} // The OnExecute method receives an napi_env argument. However, do NOT // use it within this method, as it does not run on the main thread and must // not run any method that would cause JavaScript to run. In practice, this // means that almost any use of napi_env will be incorrect. -inline void AsyncWorker::OnExecute(napi_env /*DO_NOT_USE*/, void* this_pointer) { - AsyncWorker* self = static_cast(this_pointer); +inline void AsyncWorker::OnExecute(napi_env /*DO_NOT_USE*/) { #ifdef NAPI_CPP_EXCEPTIONS try { - self->Execute(); + this->Execute(); } catch (const std::exception& e) { - self->SetError(e.what()); + this->SetError(e.what()); } #else // NAPI_CPP_EXCEPTIONS - self->Execute(); + this->Execute(); #endif // NAPI_CPP_EXCEPTIONS } -inline void AsyncWorker::OnWorkComplete( - napi_env /*env*/, napi_status status, void* this_pointer) { - AsyncWorker* self = static_cast(this_pointer); +inline void OnAsyncWorkComplete(napi_env env, + napi_status status, + void* asyncworker) { + AsyncWorker* self = static_cast(asyncworker); + self->OnWorkComplete(env, status); +} +inline void AsyncWorker::OnWorkComplete(napi_env /*env*/, napi_status status) { if (status != napi_cancelled) { - HandleScope scope(self->_env); + HandleScope scope(this->_env); details::WrapCallback([&] { - if (self->_error.size() == 0) { - self->OnOK(); + if (this->_error.size() == 0) { + this->OnOK(); } else { - self->OnError(Error::New(self->_env, self->_error)); + this->OnError(Error::New(this->_env, this->_error)); } return nullptr; }); } - if (!self->_suppress_destruct) { - self->Destroy(); + if (!this->_suppress_destruct) { + this->Destroy(); } } @@ -4172,9 +4183,38 @@ inline void ThreadSafeFunction::CallJS(napi_env env, } //////////////////////////////////////////////////////////////////////////////// -// Async Progress Worker class +// Async Progress Worker Base class //////////////////////////////////////////////////////////////////////////////// +inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource) + : AsyncWorker(receiver, callback, resource_name, resource) { + _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1); +} +#if NAPI_VERSION > 4 +inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(Napi::Env env, + const char* resource_name, + const Object& resource) + : AsyncWorker(env, resource_name, resource) { + // TODO: Once the changes to make the callback optional for threadsafe + // functions are no longer optional we can remove the dummy Function here. + Function callback; + _tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1); +} +#endif + +inline void OnAsyncWorkProgress(Napi::Env /* env /*, + Napi::Function /* jsCallback */, + void* asyncworker) { + AsyncProgressWorkerBase* asyncprogressworker = static_cast(asyncworker); + asyncprogressworker->OnWorkProgress(); +} + +//////////////////////////////////////////////////////////////////////////////// +// Async Progress Worker class +//////////////////////////////////////////////////////////////////////////////// template inline AsyncProgressWorker::AsyncProgressWorker(const Function& callback) : AsyncProgressWorker(callback, "generic") { @@ -4217,10 +4257,9 @@ inline AsyncProgressWorker::AsyncProgressWorker(const Object& receiver, const Function& callback, const char* resource_name, const Object& resource) - : AsyncWorker(receiver, callback, resource_name, resource), + : AsyncProgressWorkerBase(receiver, callback, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) { - _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1); } #if NAPI_VERSION > 4 @@ -4239,13 +4278,9 @@ template inline AsyncProgressWorker::AsyncProgressWorker(Napi::Env env, const char* resource_name, const Object& resource) - : AsyncWorker(env, resource_name, resource), + : AsyncProgressWorkerBase(env, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) { - // TODO: Once the changes to make the callback optional for threadsafe - // functions are no longer optional we can remove the dummy Function here. - Function callback; - _tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1); } #endif @@ -4253,13 +4288,13 @@ template inline AsyncProgressWorker::~AsyncProgressWorker() { // Abort pending tsfn call. // Don't send progress events after we've already completed. - _tsfn.Abort(); + this->_tsfn.Abort(); { - std::lock_guard lock(_mutex); + std::lock_guard lock(this->_mutex); _asyncdata = nullptr; _asyncsize = 0; } - _tsfn.Release(); + this->_tsfn.Release(); } template @@ -4269,20 +4304,18 @@ inline void AsyncProgressWorker::Execute() { } template -inline void AsyncProgressWorker::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) { - AsyncProgressWorker* self = static_cast(_data); - +inline void AsyncProgressWorker::OnWorkProgress() { T* data; size_t size; { - std::lock_guard lock(self->_mutex); - data = self->_asyncdata; - size = self->_asyncsize; - self->_asyncdata = nullptr; - self->_asyncsize = 0; + std::lock_guard lock(this->_mutex); + data = this->_asyncdata; + size = this->_asyncsize; + this->_asyncdata = nullptr; + this->_asyncsize = 0; } - self->OnProgress(data, size); + this->OnProgress(data, size); delete[] data; } @@ -4293,19 +4326,19 @@ inline void AsyncProgressWorker::SendProgress_(const T* data, size_t count) { T* old_data; { - std::lock_guard lock(_mutex); + std::lock_guard lock(this->_mutex); old_data = _asyncdata; _asyncdata = new_data; _asyncsize = count; } - _tsfn.NonBlockingCall(this, WorkProgress_); + this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress); delete[] old_data; } template inline void AsyncProgressWorker::Signal() const { - _tsfn.NonBlockingCall(this, WorkProgress_); + this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress); } template @@ -4318,6 +4351,151 @@ inline void AsyncProgressWorker::ExecutionProgress::Send(const T* data, size_ _worker->SendProgress_(data, count); } +//////////////////////////////////////////////////////////////////////////////// +// Async Progress Queue Worker class +//////////////////////////////////////////////////////////////////////////////// +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Function& callback) + : AsyncProgressQueueWorker(callback, "generic") { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Function& callback, + const char* resource_name) + : AsyncProgressQueueWorker(callback, resource_name, Object::New(callback.Env())) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Function& callback, + const char* resource_name, + const Object& resource) + : AsyncProgressQueueWorker(Object::New(callback.Env()), + callback, + resource_name, + resource) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Object& receiver, + const Function& callback) + : AsyncProgressQueueWorker(receiver, callback, "generic") { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name) + : AsyncProgressQueueWorker(receiver, + callback, + resource_name, + Object::New(callback.Env())) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource) + : AsyncProgressWorkerBase(receiver, callback, resource_name, resource) { +} + +#if NAPI_VERSION > 4 +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(Napi::Env env) + : AsyncProgressQueueWorker(env, "generic") { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name) + : AsyncProgressQueueWorker(env, resource_name, Object::New(env)) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name, + const Object& resource) + : AsyncProgressWorkerBase(env, resource_name, resource) { +} +#endif + +template +inline AsyncProgressQueueWorker::~AsyncProgressQueueWorker() { + // Abort pending tsfn call. + // Don't send progress events after we've already completed. + this->_tsfn.Abort(); + { + std::lock_guard lock(this->_mutex); + while (!_asyncdata.empty()) { + std::pair &datapair = _asyncdata.front(); + T *data = datapair.first; + + _asyncdata.pop(); + + delete[] data; + } + } + this->_tsfn.Release(); +} + +template +inline void AsyncProgressQueueWorker::Execute() { + ExecutionProgress progress(this); + Execute(progress); +} + +template +inline void AsyncProgressQueueWorker::OnWorkProgress() { + this->_mutex.lock(); + while (!this->_asyncdata.empty()) { + std::pair &datapair = this->_asyncdata.front(); + + T *data = datapair.first; + size_t size = datapair.second; + + this->_asyncdata.pop(); + this->_mutex.unlock(); + + this->OnProgress(data, size); + delete[] data; + + this->_mutex.lock(); + } + this->_mutex.unlock(); +} + +template +inline void AsyncProgressQueueWorker::SendProgress_(const T* data, size_t count) { + T* new_data = new T[count]; + std::copy(data, data + count, new_data); + + { + std::lock_guard lock(this->_mutex); + _asyncdata.push(std::pair(new_data, count)); + } + this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress); +} + +template +inline void AsyncProgressQueueWorker::Signal() const { + this->_tsfn.NonBlockingCall(this, OnAsyncWorkProgress); +} + +template +inline void AsyncProgressQueueWorker::OnWorkComplete(napi_env env, napi_status status) { + this->OnWorkProgress(); + AsyncWorker::OnWorkComplete(env, status); +} + +template +inline void AsyncProgressQueueWorker::ExecutionProgress::Signal() const { + _worker->Signal(); +} + +template +inline void AsyncProgressQueueWorker::ExecutionProgress::Send(const T* data, size_t count) const { + _worker->SendProgress_(data, count); +} #endif //////////////////////////////////////////////////////////////////////////////// diff --git a/napi.h b/napi.h index 2fc9b10f3..4578294db 100644 --- a/napi.h +++ b/napi.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -1817,6 +1818,12 @@ namespace Napi { napi_async_context _context; }; + + inline void OnAsyncWorkExecute(napi_env env, void* asyncworker); + inline void OnAsyncWorkComplete(napi_env env, + napi_status status, + void* asyncworker); + class AsyncWorker { public: virtual ~AsyncWorker(); @@ -1838,6 +1845,10 @@ namespace Napi { ObjectReference& Receiver(); FunctionReference& Callback(); + virtual void OnExecute(napi_env env); + virtual void OnWorkComplete(napi_env env, + napi_status status); + protected: explicit AsyncWorker(const Function& callback); explicit AsyncWorker(const Function& callback, @@ -1871,11 +1882,6 @@ namespace Napi { void SetError(const std::string& error); private: - static void OnExecute(napi_env env, void* this_pointer); - static void OnWorkComplete(napi_env env, - napi_status status, - void* this_pointer); - napi_env _env; napi_async_work _work; ObjectReference _receiver; @@ -2090,8 +2096,33 @@ namespace Napi { napi_threadsafe_function _tsfn; }; + inline void OnAsyncWorkProgress(Napi::Env env, + Napi::Function jsCallback, + void* asyncworker); + + class AsyncProgressWorkerBase : public AsyncWorker { + public: + virtual void OnWorkProgress() = 0; + protected: + explicit AsyncProgressWorkerBase(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource); + +// Optional callback of Napi::ThreadSafeFunction only available after NAPI_VERSION 4. +// Refs: https://github.com/nodejs/node/pull/27791 +#if NAPI_VERSION > 4 + explicit AsyncProgressWorkerBase(Napi::Env env, + const char* resource_name, + const Object& resource); +#endif + + std::mutex _mutex; + ThreadSafeFunction _tsfn; + }; + template - class AsyncProgressWorker : public AsyncWorker { + class AsyncProgressWorker : public AsyncProgressWorkerBase { public: virtual ~AsyncProgressWorker(); @@ -2105,19 +2136,21 @@ namespace Napi { AsyncProgressWorker* const _worker; }; + void OnWorkProgress() override; + protected: - explicit AsyncProgressWorker(const Function& callback); - explicit AsyncProgressWorker(const Function& callback, + explicit AsyncProgressWorker(const Function& callback); + explicit AsyncProgressWorker(const Function& callback, const char* resource_name); - explicit AsyncProgressWorker(const Function& callback, + explicit AsyncProgressWorker(const Function& callback, const char* resource_name, const Object& resource); - explicit AsyncProgressWorker(const Object& receiver, + explicit AsyncProgressWorker(const Object& receiver, const Function& callback); - explicit AsyncProgressWorker(const Object& receiver, + explicit AsyncProgressWorker(const Object& receiver, const Function& callback, const char* resource_name); - explicit AsyncProgressWorker(const Object& receiver, + explicit AsyncProgressWorker(const Object& receiver, const Function& callback, const char* resource_name, const Object& resource); @@ -2125,28 +2158,79 @@ namespace Napi { // Optional callback of Napi::ThreadSafeFunction only available after NAPI_VERSION 4. // Refs: https://github.com/nodejs/node/pull/27791 #if NAPI_VERSION > 4 - explicit AsyncProgressWorker(Napi::Env env); - explicit AsyncProgressWorker(Napi::Env env, + explicit AsyncProgressWorker(Napi::Env env); + explicit AsyncProgressWorker(Napi::Env env, const char* resource_name); - explicit AsyncProgressWorker(Napi::Env env, + explicit AsyncProgressWorker(Napi::Env env, const char* resource_name, const Object& resource); #endif - virtual void Execute(const ExecutionProgress& progress) = 0; virtual void OnProgress(const T* data, size_t count) = 0; private: - static void WorkProgress_(Napi::Env env, Napi::Function jsCallback, void* data); - void Execute() override; void Signal() const; void SendProgress_(const T* data, size_t count); - std::mutex _mutex; T* _asyncdata; size_t _asyncsize; - ThreadSafeFunction _tsfn; + }; + + template + class AsyncProgressQueueWorker : public AsyncProgressWorkerBase { + public: + virtual ~AsyncProgressQueueWorker(); + + class ExecutionProgress { + friend class AsyncProgressQueueWorker; + public: + void Signal() const; + void Send(const T* data, size_t count) const; + private: + explicit ExecutionProgress(AsyncProgressQueueWorker* worker) : _worker(worker) {} + AsyncProgressQueueWorker* const _worker; + }; + + void OnWorkComplete(napi_env env, napi_status status) override; + void OnWorkProgress() override; + + protected: + explicit AsyncProgressQueueWorker(const Function& callback); + explicit AsyncProgressQueueWorker(const Function& callback, + const char* resource_name); + explicit AsyncProgressQueueWorker(const Function& callback, + const char* resource_name, + const Object& resource); + explicit AsyncProgressQueueWorker(const Object& receiver, + const Function& callback); + explicit AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name); + explicit AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource); + +// Optional callback of Napi::ThreadSafeFunction only available after NAPI_VERSION 4. +// Refs: https://github.com/nodejs/node/pull/27791 +#if NAPI_VERSION > 4 + explicit AsyncProgressQueueWorker(Napi::Env env); + explicit AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name); + explicit AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name, + const Object& resource); +#endif + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void OnProgress(const T* data, size_t count) = 0; + + private: + void Execute() override; + void Signal() const; + void SendProgress_(const T* data, size_t count); + + std::queue> _asyncdata; }; #endif diff --git a/test/asyncprogressqueueworker.cc b/test/asyncprogressqueueworker.cc new file mode 100644 index 000000000..9a53109d0 --- /dev/null +++ b/test/asyncprogressqueueworker.cc @@ -0,0 +1,68 @@ +#include "napi.h" + +#include +#include +#include +#include + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +struct ProgressData { + size_t progress; +}; + +class TestWorker : public AsyncProgressQueueWorker { +public: + static void DoWork(const CallbackInfo& info) { + int32_t times = info[0].As().Int32Value(); + Function cb = info[1].As(); + Function progress = info[2].As(); + + TestWorker* worker = new TestWorker(cb, progress, "TestResource", Object::New(info.Env())); + worker->_times = times; + worker->Queue(); + } + +protected: + void Execute(const ExecutionProgress& progress) override { + if (_times < 0) { + SetError("test error"); + } + ProgressData data{0}; + for (int32_t idx = 0; idx < _times; idx++) { + data.progress = idx; + progress.Send(&data, 1); + } + } + + void OnProgress(const ProgressData* data, size_t /* count */) override { + Napi::Env env = Env(); + if (!_progress.IsEmpty()) { + Number progress = Number::New(env, data->progress); + _progress.MakeCallback(Receiver().Value(), { progress }); + } + } + +private: + TestWorker(Function cb, Function progress, const char* resource_name, const Object& resource) + : AsyncProgressQueueWorker(cb, resource_name, resource) { + _progress.Reset(progress, 1); + } + + int32_t _times; + FunctionReference _progress; +}; + +} + +Object InitAsyncProgressQueueWorker(Env env) { + Object exports = Object::New(env); + exports["doWork"] = Function::New(env, TestWorker::DoWork); + return exports; +} + +#endif diff --git a/test/asyncprogressqueueworker.js b/test/asyncprogressqueueworker.js new file mode 100644 index 000000000..217b49e11 --- /dev/null +++ b/test/asyncprogressqueueworker.js @@ -0,0 +1,42 @@ +'use strict'; +const buildType = process.config.target_defaults.default_configuration; +const common = require('./common') +const assert = require('assert'); + +test(require(`./build/${buildType}/binding.node`)); +test(require(`./build/${buildType}/binding_noexcept.node`)); + +function test({ asyncprogressqueueworker }) { + success(asyncprogressqueueworker); + fail(asyncprogressqueueworker); + return; +} + +function success(binding) { + const expected = [0, 1, 2, 3]; + const actual = []; + binding.doWork(expected.length, + common.mustCall((err) => { + if (err) { + assert.fail(err); + } + }), + common.mustCall((_progress) => { + actual.push(_progress); + if (actual.length === expected.length) { + assert.deepEqual(actual, expected); + } + }, expected.length) + ); +} + +function fail(binding) { + binding.doWork(-1, + common.mustCall((err) => { + assert.throws(() => { throw err }, /test error/) + }), + () => { + assert.fail('unexpected progress report'); + } + ); +} diff --git a/test/binding.cc b/test/binding.cc index 403134bf6..95fc6af78 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -5,6 +5,7 @@ using namespace Napi; Object InitArrayBuffer(Env env); Object InitAsyncContext(Env env); #if (NAPI_VERSION > 3) +Object InitAsyncProgressQueueWorker(Env env); Object InitAsyncProgressWorker(Env env); #endif Object InitAsyncWorker(Env env); @@ -55,6 +56,7 @@ Object Init(Env env, Object exports) { exports.Set("arraybuffer", InitArrayBuffer(env)); exports.Set("asynccontext", InitAsyncContext(env)); #if (NAPI_VERSION > 3) + exports.Set("asyncprogressqueueworker", InitAsyncProgressQueueWorker(env)); exports.Set("asyncprogressworker", InitAsyncProgressWorker(env)); #endif exports.Set("asyncworker", InitAsyncWorker(env)); diff --git a/test/binding.gyp b/test/binding.gyp index aa575fba3..6f492e27d 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -8,6 +8,7 @@ 'sources': [ 'arraybuffer.cc', 'asynccontext.cc', + 'asyncprogressqueueworker.cc', 'asyncprogressworker.cc', 'asyncworker.cc', 'asyncworker-persistent.cc', diff --git a/test/index.js b/test/index.js index bb42b955a..9e0df1e72 100644 --- a/test/index.js +++ b/test/index.js @@ -10,6 +10,7 @@ process.config.target_defaults.default_configuration = let testModules = [ 'arraybuffer', 'asynccontext', + 'asyncprogressqueueworker', 'asyncprogressworker', 'asyncworker', 'asyncworker-nocallback', @@ -67,6 +68,7 @@ if (napiVersion < 3) { } if (napiVersion < 4) { + testModules.splice(testModules.indexOf('asyncprogressqueueworker'), 1); testModules.splice(testModules.indexOf('asyncprogressworker'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_sum'), 1);