From b67bfe7d61addd2a11525b94b002d5c94a7c60cd Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Mon, 20 Sep 2021 14:50:19 +0200 Subject: [PATCH] Cleanup hanging iterator also when `next()` errored (cherry picked from commit Level/leveldown@7356ba43d3f7261c9b871e947d219e78586ccef2) --- binding.cc | 86 +++++++++++++------------- test/cleanup-hanging-iterators-test.js | 24 +++++++ 2 files changed, 66 insertions(+), 44 deletions(-) diff --git a/binding.cc b/binding.cc index a5ae461d..1c642b05 100644 --- a/binding.cc +++ b/binding.cc @@ -274,6 +274,7 @@ static napi_status CallFunction (napi_env env, * * - DoExecute (abstract, worker pool thread): main work * - HandleOKCallback (main thread): call JS callback on success + * - HandleErrorCallback (main thread): call JS callback on error * - DoFinally (main thread): do cleanup regardless of success */ struct BaseWorker { @@ -324,48 +325,52 @@ struct BaseWorker { } virtual void DoExecute () = 0; - virtual void DoFinally (napi_env env) {}; static void Complete (napi_env env, napi_status status, void* data) { BaseWorker* self = (BaseWorker*)data; self->DoComplete(env); self->DoFinally(env); - - napi_delete_reference(env, self->callbackRef_); - napi_delete_async_work(env, self->asyncWork_); - - delete self; } void DoComplete (napi_env env) { - if (status_.ok()) { - return HandleOKCallback(env); - } - - napi_value argv = CreateError(env, errMsg_); napi_value callback; napi_get_reference_value(env, callbackRef_, &callback); - CallFunction(env, callback, 1, &argv); + + if (status_.ok()) { + HandleOKCallback(env, callback); + } else { + HandleErrorCallback(env, callback); + } } - virtual void HandleOKCallback (napi_env env) { + virtual void HandleOKCallback (napi_env env, napi_value callback) { napi_value argv; napi_get_null(env, &argv); - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 1, &argv); } + virtual void HandleErrorCallback (napi_env env, napi_value callback) { + napi_value argv = CreateError(env, errMsg_); + CallFunction(env, callback, 1, &argv); + } + + virtual void DoFinally (napi_env env) { + napi_delete_reference(env, callbackRef_); + napi_delete_async_work(env, asyncWork_); + + delete this; + } + void Queue (napi_env env) { napi_queue_async_work(env, asyncWork_); } - napi_ref callbackRef_; - napi_async_work asyncWork_; Database* database_; private: + napi_ref callbackRef_; + napi_async_work asyncWork_; leveldb::Status status_; char *errMsg_; }; @@ -503,6 +508,7 @@ struct PriorityWorker : public BaseWorker { void DoFinally (napi_env env) override { database_->DecrementPriorityWork(env); + BaseWorker::DoFinally(env); } }; @@ -519,7 +525,6 @@ struct BaseIterator { int limit, bool fillCache) : database_(database), - isEnding_(false), hasEnded_(false), didSeek_(false), reverse_(reverse), @@ -682,7 +687,6 @@ struct BaseIterator { } Database* database_; - bool isEnding_; bool hasEnded_; private: @@ -726,6 +730,7 @@ struct Iterator final : public BaseIterator { highWaterMark_(highWaterMark), landed_(false), nexting_(false), + isEnding_(false), endWorker_(NULL), ref_(NULL) { } @@ -742,15 +747,6 @@ struct Iterator final : public BaseIterator { if (ref_ != NULL) napi_delete_reference(env, ref_); } - void CheckEndCallback (napi_env env) { - nexting_ = false; - - if (endWorker_ != NULL) { - endWorker_->Queue(env); - endWorker_ = NULL; - } - } - bool ReadMany (uint32_t size, std::vector>& result) { size_t bytesRead = 0; @@ -793,6 +789,7 @@ struct Iterator final : public BaseIterator { uint32_t highWaterMark_; bool landed_; bool nexting_; + bool isEnding_; BaseWorker* endWorker_; private: @@ -1095,7 +1092,7 @@ struct GetWorker final : public PriorityWorker { SetStatus(database_->Get(options_, key_, value_)); } - void HandleOKCallback (napi_env env) override { + void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; napi_get_null(env, &argv[0]); @@ -1105,8 +1102,6 @@ struct GetWorker final : public PriorityWorker { napi_create_string_utf8(env, value_.data(), value_.size(), &argv[1]); } - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 2, argv); } @@ -1285,12 +1280,10 @@ struct ApproximateSizeWorker final : public PriorityWorker { size_ = database_->ApproximateSize(&range); } - void HandleOKCallback (napi_env env) override { + void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; napi_get_null(env, &argv[0]); napi_create_int64(env, (uint64_t)size_, &argv[1]); - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 2, argv); } @@ -1548,10 +1541,9 @@ struct EndWorker final : public BaseWorker { iterator_->End(); } - void HandleOKCallback (napi_env env) override { - // TODO: would this be safe(r) to do in DoFinally() i.e. after we call the callback? + void DoFinally (napi_env env) override { iterator_->Detach(env); - BaseWorker::HandleOKCallback(env); + BaseWorker::DoFinally(env); } Iterator* iterator_; @@ -1613,7 +1605,7 @@ struct NextWorker final : public BaseWorker { } } - void HandleOKCallback (napi_env env) override { + void HandleOKCallback (napi_env env, napi_value callback) override { size_t arraySize = result_.size() * 2; napi_value jsArray; napi_create_array_with_length(env, arraySize, &jsArray); @@ -1642,19 +1634,25 @@ struct NextWorker final : public BaseWorker { napi_set_element(env, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); } - // clean up & handle the next/end state - // TODO: always do this, even on error - iterator_->CheckEndCallback(env); - napi_value argv[3]; napi_get_null(env, &argv[0]); argv[1] = jsArray; napi_get_boolean(env, !ok_, &argv[2]); - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 3, argv); } + void DoFinally (napi_env env) override { + // clean up & handle the next/end state + iterator_->nexting_ = false; + + if (iterator_->endWorker_ != NULL) { + iterator_->endWorker_->Queue(env); + iterator_->endWorker_ = NULL; + } + + BaseWorker::DoFinally(env); + } + Iterator* iterator_; std::vector > result_; bool ok_; diff --git a/test/cleanup-hanging-iterators-test.js b/test/cleanup-hanging-iterators-test.js index 89578a28..535977ee 100644 --- a/test/cleanup-hanging-iterators-test.js +++ b/test/cleanup-hanging-iterators-test.js @@ -92,3 +92,27 @@ makeTest('test ending iterators', function (db, t, done) { done() }) }) + +makeTest('test recursive next', function (db, t, done) { + // Test that we're able to close when user keeps scheduling work + const it = db.iterator({ highWaterMark: 0 }) + + it.next(function loop (err, key) { + if (err && err.message !== 'iterator has ended') throw err + if (key !== undefined) it.next(loop) + }) + + done() +}) + +makeTest('test recursive next (random)', function (db, t, done) { + // Same as the test above but closing at a random time + const it = db.iterator({ highWaterMark: 0 }) + + it.next(function loop (err, key) { + if (err && err.message !== 'iterator has ended') throw err + if (key !== undefined) it.next(loop) + }) + + setTimeout(done, Math.floor(Math.random() * 50)) +})