Skip to content
This repository has been archived by the owner on Dec 1, 2024. It is now read-only.

Commit

Permalink
Optimize db.iterator()
Browse files Browse the repository at this point in the history
By using `emplace_back()`, reusing the `std::vector` cache between
`iterator.next()` calls, and not advancing the iterator prematurely.
That last one only matters for single reads (i.e. the first `next()`
call or one made after seeking) and it doesn't improve performance
compared to the previous release, just undoes a mistake in Level/leveldown@b815bea.

(cherry picked from commit Level/leveldown@112906b)
  • Loading branch information
vweevers committed Nov 27, 2021
1 parent b67bfe7 commit 9f9b13c
Showing 1 changed file with 67 additions and 69 deletions.
136 changes: 67 additions & 69 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ struct Database {
}
}

bool HasPriorityWork () {
bool HasPriorityWork () const {
return priorityWork_ > 0;
}

Expand Down Expand Up @@ -533,8 +533,7 @@ struct BaseIterator {
gt_(gt),
gte_(gte),
limit_(limit),
count_(0),
eof_(false) {
count_(0) {
options_ = new leveldb::ReadOptions();
options_->fill_cache = fillCache;
options_->verify_checksums = false;
Expand Down Expand Up @@ -601,40 +600,22 @@ struct BaseIterator {
didSeek_ = true;

if (OutOfRange(target)) {
if (reverse_) {
dbIterator_->SeekToFirst();
dbIterator_->Prev();
} else {
dbIterator_->SeekToLast();
dbIterator_->Next();
}

return;
return SeekToEnd();
}

dbIterator_->Seek(target);

if (dbIterator_->Valid()) {
int cmp = dbIterator_->key().compare(target);
if (cmp > 0 && reverse_) {
dbIterator_->Prev();
} else if (cmp < 0 && !reverse_) {
dbIterator_->Next();
if (reverse_ ? cmp > 0 : cmp < 0) {
Next();
}
} else {
if (reverse_) {
dbIterator_->SeekToLast();
} else {
dbIterator_->SeekToFirst();
}
SeekToFirst();
if (dbIterator_->Valid()) {
int cmp = dbIterator_->key().compare(target);
if (cmp > 0 && reverse_) {
dbIterator_->SeekToFirst();
dbIterator_->Prev();
} else if (cmp < 0 && !reverse_) {
dbIterator_->SeekToLast();
dbIterator_->Next();
if (reverse_ ? cmp > 0 : cmp < 0) {
SeekToEnd();
}
}
}
Expand All @@ -649,24 +630,34 @@ struct BaseIterator {
}
}

bool ReadOne () {
if (eof_ || !dbIterator_->Valid()) {
return false;
}

if ((limit_ >= 0 && ++count_ > limit_) || OutOfRange(dbIterator_->key())) {
eof_ = true;
return false;
}
bool Valid () const {
return dbIterator_->Valid() && !OutOfRange(dbIterator_->key());
}

return true;
bool Increment () {
return limit_ < 0 || ++count_ <= limit_;
}

void Advance () {
void Next () {
if (reverse_) dbIterator_->Prev();
else dbIterator_->Next();
}

void SeekToFirst () {
if (reverse_) dbIterator_->SeekToLast();
else dbIterator_->SeekToFirst();
}

void SeekToLast () {
if (reverse_) dbIterator_->SeekToFirst();
else dbIterator_->SeekToLast();
}

void SeekToEnd () {
SeekToLast();
Next();
}

leveldb::Slice CurrentKey () const {
return dbIterator_->key();
}
Expand All @@ -679,7 +670,13 @@ struct BaseIterator {
return dbIterator_->status();
}

bool OutOfRange (const leveldb::Slice& target) {
bool OutOfRange (const leveldb::Slice& target) const {
// TODO: benchmark to see if this is worth it
// if (upperBoundOnly && !reverse_) {
// return ((lt_ != NULL && target.compare(*lt_) >= 0) ||
// (lte_ != NULL && target.compare(*lte_) > 0));
// }

return ((lt_ != NULL && target.compare(*lt_) >= 0) ||
(lte_ != NULL && target.compare(*lte_) > 0) ||
(gt_ != NULL && target.compare(*gt_) <= 0) ||
Expand All @@ -699,7 +696,6 @@ struct BaseIterator {
std::string* gte_;
int limit_;
int count_;
bool eof_;
leveldb::ReadOptions* options_;
};

Expand Down Expand Up @@ -747,33 +743,36 @@ struct Iterator final : public BaseIterator {
if (ref_ != NULL) napi_delete_reference(env, ref_);
}

bool ReadMany (uint32_t size, std::vector<std::pair<std::string, std::string>>& result) {
bool ReadMany (uint32_t size) {
cache_.clear();
size_t bytesRead = 0;

while (ReadOne()) {
std::string key, value;
while (true) {
if (landed_) Next();
if (!Valid() || !Increment()) break;

if (keys_) {
leveldb::Slice slice = CurrentKey();
key.assign(slice.data(), slice.size());
bytesRead += key.size();
cache_.emplace_back(slice.data(), slice.size());
bytesRead += slice.size();
} else {
cache_.emplace_back("");
}

if (values_) {
leveldb::Slice slice = CurrentValue();
value.assign(slice.data(), slice.size());
bytesRead += value.size();
cache_.emplace_back(slice.data(), slice.size());
bytesRead += slice.size();
} else {
cache_.emplace_back("");
}

Advance();
result.push_back(std::make_pair(key, value));

if (!landed_) {
landed_ = true;
return true;
}

if (bytesRead > highWaterMark_ || result.size() >= size) {
if (bytesRead > highWaterMark_ || cache_.size() >= size * 2) {
return true;
}
}
Expand All @@ -791,6 +790,7 @@ struct Iterator final : public BaseIterator {
bool nexting_;
bool isEnding_;
BaseWorker* endWorker_;
std::vector<std::string> cache_;

private:
napi_ref ref_;
Expand Down Expand Up @@ -1188,18 +1188,18 @@ struct ClearWorker final : public PriorityWorker {
std::string* gt,
std::string* gte)
: PriorityWorker(env, database, callback, "leveldown.db.clear") {
baseIterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false);
iterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false);
writeOptions_ = new leveldb::WriteOptions();
writeOptions_->sync = false;
}

~ClearWorker () {
delete baseIterator_;
delete iterator_;
delete writeOptions_;
}

void DoExecute () override {
baseIterator_->SeekToRange();
iterator_->SeekToRange();

// TODO: add option
uint32_t hwm = 16 * 1024;
Expand All @@ -1208,14 +1208,14 @@ struct ClearWorker final : public PriorityWorker {
while (true) {
size_t bytesRead = 0;

while (bytesRead < hwm && baseIterator_->ReadOne()) {
leveldb::Slice key = baseIterator_->CurrentKey();
while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) {
leveldb::Slice key = iterator_->CurrentKey();
batch.Delete(key);
bytesRead += key.size();
baseIterator_->Advance();
iterator_->Next();
}

if (!SetStatus(baseIterator_->Status()) || bytesRead == 0) {
if (!SetStatus(iterator_->Status()) || bytesRead == 0) {
break;
}

Expand All @@ -1226,11 +1226,11 @@ struct ClearWorker final : public PriorityWorker {
batch.Clear();
}

baseIterator_->End();
iterator_->End();
}

private:
BaseIterator* baseIterator_;
BaseIterator* iterator_;
leveldb::WriteOptions* writeOptions_;
};

Expand Down Expand Up @@ -1598,22 +1598,21 @@ struct NextWorker final : public BaseWorker {

// Limit the size of the cache to prevent starving the event loop
// in JS-land while we're recursively calling process.nextTick().
ok_ = iterator_->ReadMany(1000, result_);
ok_ = iterator_->ReadMany(1000);

if (!ok_) {
SetStatus(iterator_->Status());
}
}

void HandleOKCallback (napi_env env, napi_value callback) override {
size_t arraySize = result_.size() * 2;
size_t arraySize = iterator_->cache_.size();
napi_value jsArray;
napi_create_array_with_length(env, arraySize, &jsArray);

for (size_t idx = 0; idx < result_.size(); ++idx) {
std::pair<std::string, std::string> row = result_[idx];
std::string key = row.first;
std::string value = row.second;
for (size_t idx = 0; idx < iterator_->cache_.size(); idx += 2) {
std::string key = iterator_->cache_[idx];
std::string value = iterator_->cache_[idx + 1];

napi_value returnKey;
if (iterator_->keyAsBuffer_) {
Expand All @@ -1630,8 +1629,8 @@ struct NextWorker final : public BaseWorker {
}

// put the key & value in a descending order, so that they can be .pop:ed in javascript-land
napi_set_element(env, jsArray, static_cast<int>(arraySize - idx * 2 - 1), returnKey);
napi_set_element(env, jsArray, static_cast<int>(arraySize - idx * 2 - 2), returnValue);
napi_set_element(env, jsArray, static_cast<int>(arraySize - idx - 1), returnKey);
napi_set_element(env, jsArray, static_cast<int>(arraySize - idx - 2), returnValue);
}

napi_value argv[3];
Expand All @@ -1654,7 +1653,6 @@ struct NextWorker final : public BaseWorker {
}

Iterator* iterator_;
std::vector<std::pair<std::string, std::string> > result_;
bool ok_;
};

Expand Down

0 comments on commit 9f9b13c

Please sign in to comment.