Skip to content

Commit

Permalink
[feat][document] Implement DocumentIndex class and tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor authored and rock-git committed May 20, 2024
1 parent 27a33c4 commit 8e644d7
Show file tree
Hide file tree
Showing 8 changed files with 566 additions and 47 deletions.
19 changes: 13 additions & 6 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -945,13 +945,20 @@ message DocumentSearchParameter {
// The query_string for search
string query_string = 2;

bool without_scalar_data = 3; // Default false, if true, response without scalar data.
repeated string selected_keys = 4; // If without_scalar_data is false, selected_keys is used to select scalar data,
// if this parameter is null, all scalar data will be returned.

// use id filter
bool use_id_filter = 5;
repeated int64 document_ids = 12; // if use_id_filter = true, use this field
bool use_id_filter = 3;
repeated int64 document_ids = 4; // if use_id_filter = true, use this field

// use column filter
// if query_string is simple string, use column_names to select columns
// if query_string is full functional expr, like "col1: value1 AND col2: value2", column_names is ignored
repeated string column_names = 5;

// for output, if only id is needed, set without_scalar_data = true
// else set without_scalar_data = false, and set selected_keys to select scalar data
bool without_scalar_data = 10; // Default false, if true, response without scalar data.
repeated string selected_keys = 11; // If without_scalar_data is false, selected_keys is used to select scalar data,
// if this parameter is null, all scalar data will be returned.
}

message DocumentWithScore {
Expand Down
113 changes: 85 additions & 28 deletions src/document/document_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "document/document_index.h"

#include <cassert>
#include <cstdint>
#include <memory>
#include <string>
Expand All @@ -31,10 +30,15 @@
#include "proto/error.pb.h"
#include "server/server.h"
#include "tantivy_search.h"
#include "tantivy_search_cxx.h"

namespace dingodb {

butil::Status DocumentIndex::RemoveIndexFiles(int64_t id, const std::string& index_path) {
DINGO_LOG(INFO) << fmt::format("[document_index.raw][id({})] remove index files, path: {}", id, index_path);
Helper::RemoveAllFileOrDirectory(index_path);
return butil::Status::OK();
}

DocumentIndex::DocumentIndex(int64_t id, const std::string& index_path,
const pb::common::DocumentIndexParameter& document_index_parameter,
const pb::common::RegionEpoch& epoch, const pb::common::Range& range)
Expand All @@ -48,7 +52,28 @@ DocumentIndex::DocumentIndex(int64_t id, const std::string& index_path,
DINGO_LOG(DEBUG) << fmt::format("[new.DocumentIndex][id({})]", id);
}

DocumentIndex::~DocumentIndex() { DINGO_LOG(DEBUG) << fmt::format("[delete.DocumentIndex][id({})]", id); }
DocumentIndex::~DocumentIndex() {
DINGO_LOG(INFO) << fmt::format("[delete.DocumentIndex][id({})]", id);

RWLockReadGuard guard(&rw_lock_);

auto bool_result = ffi_free_index_writer(index_path);
if (!bool_result.result) {
DINGO_LOG(ERROR) << fmt::format("[document_index.raw][id({})] free index writer failed, error: {}, error_msg: {}",
id, bool_result.error_code, bool_result.error_msg.c_str());
}
bool_result = ffi_free_index_reader(index_path);
if (!bool_result.result) {
DINGO_LOG(ERROR) << fmt::format("[document_index.raw][id({})] free index reader failed, error: {}, error_msg: {}",
id, bool_result.error_code, bool_result.error_msg.c_str());
}

if (is_destroyed_) {
DINGO_LOG(INFO) << fmt::format("[document_index.raw][id({})] document index is destroyed, will remove all files",
id);
RemoveIndexFiles(id, index_path);
}
}

void DocumentIndex::SetSnapshotLogId(int64_t snapshot_log_id) {
this->snapshot_log_id.store(snapshot_log_id, std::memory_order_relaxed);
Expand Down Expand Up @@ -81,6 +106,14 @@ void DocumentIndex::UnlockWrite() { rw_lock_.UnlockWrite(); }
butil::Status DocumentIndex::Add(const std::vector<pb::common::DocumentWithId>& document_with_ids, bool reload_reader) {
DINGO_LOG(INFO) << fmt::format("[document_index.raw][id({})] add document count({})", id, document_with_ids.size());

RWLockWriteGuard guard(&rw_lock_);

if (is_destroyed_) {
std::string err_msg = fmt::format("[document_index.raw][id({})] document index is destroyed", id);
DINGO_LOG(ERROR) << err_msg;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, err_msg);
}

for (const auto& document_with_id : document_with_ids) {
std::vector<std::string> text_column_names;
std::vector<std::string> text_column_docs;
Expand Down Expand Up @@ -155,6 +188,16 @@ butil::Status DocumentIndex::Add(const std::vector<pb::common::DocumentWithId>&
}

butil::Status DocumentIndex::Delete(const std::vector<int64_t>& delete_ids) {
DINGO_LOG(INFO) << fmt::format("[document_index.raw][id({})] delete document count({})", id, delete_ids.size());

RWLockWriteGuard guard(&rw_lock_);

if (is_destroyed_) {
std::string err_msg = fmt::format("[document_index.raw][id({})] document index is destroyed", id);
DINGO_LOG(ERROR) << err_msg;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, err_msg);
}

std::vector<uint64_t> delete_ids_uint64;

for (const auto& delete_id : delete_ids) {
Expand Down Expand Up @@ -184,36 +227,38 @@ butil::Status DocumentIndex::Delete(const std::vector<int64_t>& delete_ids) {
return butil::Status::OK();
}

butil::Status DocumentIndex::Search(bool use_range_filter, int64_t start_id, int64_t end_id,
const pb::common::DocumentSearchParameter& parameter,
butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_string, bool use_range_filter,
int64_t start_id, int64_t end_id, bool use_id_filter,
const std::vector<uint64_t>& alive_ids,
const std::vector<std::string>& column_names,
pb::document::DocumentWithScoreResult& results) {
auto topk = parameter.top_n();
RWLockReadGuard guard(&rw_lock_);

if (is_destroyed_) {
std::string err_msg = fmt::format("[document_index.raw][id({})] document index is destroyed", id);
DINGO_LOG(ERROR) << err_msg;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, err_msg);
}

if (topk == 0) {
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, "topk must be greater than 0");
}

if (parameter.query_string().empty()) {
if (query_string.empty()) {
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, "query string must not be empty");
}

std::vector<uint64_t> alive_ids;
if (parameter.use_id_filter()) {
for (const auto& id : parameter.document_ids()) {
if (id < 0 || id >= INT64_MAX) {
return butil::Status(pb::error::EILLEGAL_PARAMTETERS,
"document id must be greater than 0 and lesser than INT64_MAX");
}

alive_ids.push_back(id);
}
}
// if (use_id_filter) {
// for (const auto& id : alive_ids) {
// if (id < 0 || id >= INT64_MAX) {
// return butil::Status(pb::error::EILLEGAL_PARAMTETERS,
// "document id must be greater than 0 and lesser than INT64_MAX");
// }
// }
// }

std::vector<std::string> column_names;

auto search_result =
ffi_bm25_search_with_column_names(index_path, parameter.query_string(), topk, alive_ids,
parameter.use_id_filter(), use_range_filter, start_id, end_id, column_names);
auto search_result = ffi_bm25_search_with_column_names(index_path, query_string, topk, alive_ids, use_id_filter,
use_range_filter, start_id, end_id, column_names);

if (search_result.error_code == 0) {
for (const auto& row_id_with_score : search_result.result) {
Expand Down Expand Up @@ -260,6 +305,13 @@ butil::Status DocumentIndex::Load(const std::string& /*path*/) {
}

butil::Status DocumentIndex::GetCount(int64_t& count) {
RWLockReadGuard guard(&rw_lock_);
if (is_destroyed_) {
std::string err_msg = fmt::format("[document_index.raw][id({})] document index is destroyed", id);
DINGO_LOG(ERROR) << err_msg;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, err_msg);
}

count = ffi_get_indexed_doc_counts(index_path);
return butil::Status::OK();
}
Expand Down Expand Up @@ -801,13 +853,15 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range
auto sibling_document_index = SiblingDocumentIndex();
if (sibling_document_index != nullptr) {
pb::document::DocumentWithScoreResult results_1;
auto status = sibling_document_index->Search(false, 0, INT64_MAX, parameter, results_1);
auto status = sibling_document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX,
false, {}, {}, results_1);
if (!status.ok()) {
return status;
}

pb::document::DocumentWithScoreResult results_2;
status = document_index->Search(false, 0, INT64_MAX, parameter, results_2);
status = document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, false, {}, {},
results_2);
if (!status.ok()) {
return status;
}
Expand All @@ -822,10 +876,12 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range
DocumentCodec::DecodeRangeToDocumentId(region_range, min_document_id, max_document_id);

// use range filter
return document_index->Search(true, min_document_id, max_document_id, parameter, results);
return document_index->Search(parameter.top_n(), parameter.query_string(), true, min_document_id, max_document_id,
false, {}, {}, results);

// auto ret =
// DocumentIndexWrapper::SetDocumentIndexRangeFilter(document_index, filters, min_document_id, max_document_id);
// DocumentIndexWrapper::SetDocumentIndexRangeFilter(document_index, filters, min_document_id,
// max_document_id);
// if (!ret.ok()) {
// DINGO_LOG(ERROR) << fmt::format(
// "[document_index.wrapper][index_id({})] set document index filter failed, error: {}", Id(),
Expand All @@ -834,7 +890,8 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range
// }
}

return document_index->Search(false, 0, INT64_MAX, parameter, results);
return document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, false, {}, {},
results);
}

// butil::Status DocumentIndexWrapper::SetDocumentIndexRangeFilter(
Expand Down
21 changes: 18 additions & 3 deletions src/document/document_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "bthread/types.h"
#include "butil/status.h"
#include "common/runnable.h"
#include "common/synchronization.h"
#include "proto/common.pb.h"
#include "proto/document.pb.h"

Expand All @@ -37,6 +38,9 @@ class DocumentIndex {
DocumentIndex(int64_t id, const std::string& index_path,
const pb::common::DocumentIndexParameter& document_index_parameter,
const pb::common::RegionEpoch& epoch, const pb::common::Range& range);

static butil::Status RemoveIndexFiles(int64_t id, const std::string& index_path);

~DocumentIndex();

DocumentIndex(const DocumentIndex& rhs) = delete;
Expand All @@ -54,9 +58,9 @@ class DocumentIndex {

butil::Status Load(const std::string& path);

butil::Status Search(bool use_range_filter, int64_t start_id, int64_t end_id,
const pb::common::DocumentSearchParameter& parameter,
pb::document::DocumentWithScoreResult& results);
butil::Status Search(uint32_t topk, const std::string& query_string, bool use_range_filter, int64_t start_id,
int64_t end_id, bool use_id_filter, const std::vector<uint64_t>& alive_ids,
const std::vector<std::string>& column_names, pb::document::DocumentWithScoreResult& results);

void LockWrite();
void UnlockWrite();
Expand All @@ -83,6 +87,16 @@ class DocumentIndex {
pb::common::Range Range() const;
void SetEpochAndRange(const pb::common::RegionEpoch& epoch, const pb::common::Range& range);

void SetDestroyed() {
RWLockWriteGuard guard(&rw_lock_);
is_destroyed_ = true;
}

bool IsDestroyed() {
RWLockReadGuard guard(&rw_lock_);
return is_destroyed_;
}

protected:
// document index id
int64_t id;
Expand All @@ -102,6 +116,7 @@ class DocumentIndex {

private:
RWLock rw_lock_;
bool is_destroyed_{false};
};

using DocumentIndexPtr = std::shared_ptr<DocumentIndex>;
Expand Down
Loading

0 comments on commit 8e644d7

Please sign in to comment.