Skip to content

Commit

Permalink
[fix][index] Fix the bug that diskann build pq data exceeds 2^31
Browse files Browse the repository at this point in the history
squares. And optimize the error reporting time. It is only found on the
index side.
  • Loading branch information
yuhaijun999 authored and ketor committed Nov 15, 2024
1 parent a91794c commit 5ad562e
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dingo-store-proto
2 changes: 1 addition & 1 deletion src/common/constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class Constant {
// do not change this parameter or it will crash.
inline static const int64_t kDiskannMinCount = 2;
// max count of vectors for diskann
inline static const int64_t kDiskannMaxCount = std::numeric_limits<uint32_t>::max();
inline static const int64_t kDiskannMaxCount = std::numeric_limits<int32_t>::max();
inline static const std::string kDiskannStore = "store";
inline static const std::string kDiskannPathConfigName = "path";
inline static const std::string kDiskannNumThreadsConfigName = "num_threads";
Expand Down
50 changes: 50 additions & 0 deletions src/diskann/diskann_item.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,56 @@ butil::Status DiskANNItem::SetNoData(std::shared_ptr<Context> ctx) {
return butil::Status::OK();
}

butil::Status DiskANNItem::SetImportTooMany(std::shared_ptr<Context> ctx) {
DiskANNCoreState old_state;
butil::Status status;
BvarLatencyGuard bvar_guard(&g_diskann_server_nodata_latency);
RWLockWriteGuard guard(&rw_lock_);

SetSide(ctx);
bool is_error_occurred = false;
old_state = state_.load();

auto lambda_set_state_function = [this, &is_error_occurred, &status, ctx]() {
if (is_error_occurred) {
last_error_ = status;
error_local_side_ = local_side_;
error_remote_side_ = remote_side_;
}

ctx->SetStatus(last_error_);
ctx->SetDiskANNCoreStateX(state_);
};

ON_SCOPE_EXIT(lambda_set_state_function);
if (state_.load() == DiskANNCoreState::kImporting &&
last_error_.error_code() == pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY) {
std::string s = fmt::format("diskann is set import too many.(kImporting). ignore.");
DINGO_LOG(INFO) << s;
status = butil::Status(pb::error::Errno::OK, s);
return status;
}

if (!last_error_.ok()) {
is_error_occurred = true;
DINGO_LOG(ERROR) << "already error occurred, ignore set import too many. return." << last_error_.error_cstr();
status = last_error_;
return status;
}

if (state_.load() != DiskANNCoreState::kUnknown) {
std::string s = fmt::format("diskann item state wrong. {}", FormatParameter());
DINGO_LOG(ERROR) << s;
status = butil::Status(pb::error::Errno::EDISKANN_IMPORT_STATE_WRONG, s);
return status;
}

state_ = DiskANNCoreState::kImporting;
std::string s = fmt::format("diskann set import too many. > {}", Constant::kDiskannMaxCount);
last_error_ = butil::Status(pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY, s);
return butil::Status::OK();
}

bool DiskANNItem::IsBuildedFilesExist(int64_t vector_index_id, pb::common::MetricType metric_type) {
std::string data_bin_path = fmt::format("{}/{}/{}/{}", base_dir, normal_name, vector_index_id, input_name);
#if defined(ENABLE_DISKANN_ID_MAPPING)
Expand Down
2 changes: 2 additions & 0 deletions src/diskann/diskann_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class DiskANNItem : public std::enable_shared_from_this<DiskANNItem> {
std::string Dump(std::shared_ptr<Context> ctx);
butil::Status Count(std::shared_ptr<Context> ctx, int64_t& count); // NOLINT
butil::Status SetNoData(std::shared_ptr<Context> ctx);
butil::Status SetImportTooMany(std::shared_ptr<Context> ctx);


static void SetImportTimeout(int64_t timeout_s) { DiskANNItem::import_timeout_s = timeout_s; }
static void SetBaseDir(const std::string& base) { DiskANNItem::base_dir = base; }
Expand Down
24 changes: 24 additions & 0 deletions src/diskann/diskann_service_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,30 @@ butil::Status DiskAnnServiceHandle::VectorSetNoData(std::shared_ptr<Context> ctx
return status;
}

butil::Status DiskAnnServiceHandle::VectorSetImportTooMany(std::shared_ptr<Context> ctx, int64_t vector_index_id) {
butil::Status status;
auto item = item_manager.Find(vector_index_id);
if (item == nullptr) {
std::string s = fmt::format("vector_index_id : {} not exists", vector_index_id);
DINGO_LOG(ERROR) << s;
status = butil::Status(pb::error::EINDEX_NOT_FOUND, s);
return status;
}

status = item->SetImportTooMany(ctx);
if (!status.ok()) {
DINGO_LOG(ERROR) << status.error_cstr();
}

pb::diskann::VectorSetImportTooManyResponse& response =
(dynamic_cast<pb::diskann::VectorSetImportTooManyResponse&>(*ctx->Response()));

ServiceHelper::SetError(response.mutable_last_error(), ctx->Status().error_code(), ctx->Status().error_str());
response.set_state(DiskANNUtils::DiskANNCoreStateToPb(ctx->DiskANNCoreStateX()));

return status;
}

butil::Status DiskAnnServiceHandle::VectorDump(std::shared_ptr<Context> ctx, int64_t vector_index_id) {
butil::Status status;
auto item = item_manager.Find(vector_index_id);
Expand Down
2 changes: 2 additions & 0 deletions src/diskann/diskann_service_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class DiskAnnServiceHandle {

static butil::Status VectorSetNoData(std::shared_ptr<Context> ctx, int64_t vector_index_id);

static butil::Status VectorSetImportTooMany(std::shared_ptr<Context> ctx, int64_t vector_index_id);

static butil::Status VectorDump(std::shared_ptr<Context> ctx, int64_t vector_index_id);

static butil::Status VectorDumpAll(std::shared_ptr<Context> ctx);
Expand Down
54 changes: 54 additions & 0 deletions src/server/diskann_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,60 @@ void DiskAnnServiceImpl::VectorSetNoData(google::protobuf::RpcController* contro
}
}

static butil::Status ValidateVectorSetImportTooMany(
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request) {
if (request->vector_index_id() <= 0) {
std::string s = fmt::format("Invalid vector index id : {}", request->vector_index_id());
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s);
}

return butil::Status::OK();
}

static void DoVectorSetImportTooMany(std::shared_ptr<DiskAnnServiceHandle> handle,
google::protobuf::RpcController* controller,
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request,
::dingodb::pb::diskann::VectorSetImportTooManyResponse* response,
TrackClosure* done) {
brpc::Controller* cntl = (brpc::Controller*)controller;
brpc::ClosureGuard done_guard(done);
auto tracker = done->Tracker();
tracker->SetServiceQueueWaitTime();

butil::Status status = ValidateVectorSetImportTooMany(request);
if (!status.ok()) {
ServiceHelper::SetError(response->mutable_error(), status.error_code(), status.error_str());
return;
}

auto ctx = std::make_shared<Context>(cntl, true ? nullptr : done_guard.release(), request, response);
ctx->SetTracker(tracker);

status = handle->VectorSetImportTooMany(ctx, request->vector_index_id());
if (!status.ok()) {
ServiceHelper::SetError(response->mutable_error(), status.error_code(), status.error_str());
}
}

void DiskAnnServiceImpl::VectorSetImportTooMany(google::protobuf::RpcController* controller,
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request,
::dingodb::pb::diskann::VectorSetImportTooManyResponse* response,
::google::protobuf::Closure* done) {
auto* svr_done = new NoContextServiceClosure(__func__, done, request, response);

// Run in queue.
auto task = std::make_shared<ServiceTask>([this, controller, request, response, svr_done]() {
DoVectorSetImportTooMany(handle_, controller, request, response, svr_done);
});

bool ret = DiskANNItemRuntime::GetMiscWorkerSet()->Execute(task);
if (!ret) {
brpc::ClosureGuard done_guard(svr_done);
ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL,
"WorkerSet queue is full, please wait and retry");
}
}

static butil::Status ValidateVectorDump(const ::dingodb::pb::diskann::VectorDumpRequest* request) {
if (request->vector_index_id() <= 0) {
std::string s = fmt::format("Invalid vector index id : {}", request->vector_index_id());
Expand Down
5 changes: 5 additions & 0 deletions src/server/diskann_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class DiskAnnServiceImpl : public pb::diskann::DiskAnnService {
::dingodb::pb::diskann::VectorSetNoDataResponse* response,
::google::protobuf::Closure* done) override;

void VectorSetImportTooMany(google::protobuf::RpcController* controller,
const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request,
::dingodb::pb::diskann::VectorSetImportTooManyResponse* response,
::google::protobuf::Closure* done) override;

void VectorDump(google::protobuf::RpcController* controller, const ::dingodb::pb::diskann::VectorDumpRequest* request,
::dingodb::pb::diskann::VectorDumpResponse* response, ::google::protobuf::Closure* done) override;

Expand Down
63 changes: 63 additions & 0 deletions src/vector/vector_index_diskann.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,20 @@ butil::Status VectorIndexDiskANN::DoBuild(const pb::common::Range& region_range,
return butil::Status::OK();
}

// count too many, set error to diskann.
if (region_count > Constant::kDiskannMaxCount) {
std::string s = fmt::format("region : {} vector current count {} is more than max count {}. set error to diskann",
Id(), region_count, Constant::kDiskannMaxCount);
DINGO_LOG(WARNING) << s;
status = SendVectorSetImportTooManyRequestWrapper();
if (!status.ok()) {
DINGO_LOG(ERROR) << status.error_cstr();
return status;
}
state = pb::common::DiskANNCoreState::IMPORTING;
return butil::Status(pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY, s);
}

auto iter = reader->NewIterator(Constant::kVectorDataCF, ts, options);

if (!iter) {
Expand Down Expand Up @@ -1224,6 +1238,55 @@ butil::Status VectorIndexDiskANN::SendVectorSetNoDataRequestWrapper() {
return butil::Status::OK();
}

butil::Status VectorIndexDiskANN::SendVectorSetImportTooManyRequest(const google::protobuf::Message& request,
google::protobuf::Message& response) {
butil::Status status;
if (!InitChannel(diskann_server_addr)) {
std::string s = fmt::format("Init channel failed, addr : {}", diskann_server_addr);
DINGO_LOG(ERROR) << s;
return butil::Status(pb::error::Errno::EINTERNAL, s);
}

// count rpc
status = SendRequest("DiskAnnService", "VectorSetImportTooMany", request, response);
if (!status.ok()) {
std::string s = fmt::format("VectorSetImportTooMany request failed, errcode: {} errmsg: {}",
pb::error::Errno_Name(status.error_code()), status.error_cstr());
DINGO_LOG(ERROR) << s;
return butil::Status(status.error_code(), s);
}

return butil::Status::OK();
}
butil::Status VectorIndexDiskANN::SendVectorSetImportTooManyRequestWrapper() {
butil::Status status;

// count rpc
pb::diskann::VectorSetImportTooManyRequest vector_set_import_too_many_request;
pb::diskann::VectorSetImportTooManyResponse vector_set_import_too_many_response;

vector_set_import_too_many_request.set_vector_index_id(Id());
status = SendVectorSetImportTooManyRequest(vector_set_import_too_many_request, vector_set_import_too_many_response);
if (!status.ok()) {
std::string s = fmt::format("VectorSetImportTooMany request failed, errcode: {} errmsg: {}",
pb::error::Errno_Name(status.error_code()), status.error_cstr());
DINGO_LOG(ERROR) << s;
return butil::Status(status.error_code(), s);
}

if (vector_set_import_too_many_response.error().errcode() != pb::error::Errno::OK) {
std::string s =
fmt::format("VectorSetImportTooMany response error, errcode: {} errmsg: {} state: {}",
pb::error::Errno_Name(vector_set_import_too_many_response.error().errcode()),
vector_set_import_too_many_response.error().errmsg(),
pb::common::DiskANNCoreState_Name(vector_set_import_too_many_response.state())); // state
DINGO_LOG(ERROR) << s;
return butil::Status(vector_set_import_too_many_response.error().errcode(), s);
}

return butil::Status::OK();
}

butil::Status VectorIndexDiskANN::SendVectorBuildRequest(const google::protobuf::Message& request,
google::protobuf::Message& response) {
butil::Status status;
Expand Down
3 changes: 3 additions & 0 deletions src/vector/vector_index_diskann.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class VectorIndexDiskANN : public VectorIndex, public std::enable_shared_from_th
butil::Status SendVectorSetNoDataRequest(const google::protobuf::Message& request,
google::protobuf::Message& response);
butil::Status SendVectorSetNoDataRequestWrapper();
butil::Status SendVectorSetImportTooManyRequest(const google::protobuf::Message& request,
google::protobuf::Message& response);
butil::Status SendVectorSetImportTooManyRequestWrapper();
butil::Status SendVectorBuildRequest(const google::protobuf::Message& request, google::protobuf::Message& response);
butil::Status SendVectorBuildRequestWrapper(const pb::common::VectorBuildParameter& parameter,
pb::common::DiskANNCoreState& state);
Expand Down

0 comments on commit 5ad562e

Please sign in to comment.