From 5ad562e7066ed724150699fda50db03138dc4cd2 Mon Sep 17 00:00:00 2001 From: Haijun Yu Date: Thu, 14 Nov 2024 11:37:38 +0800 Subject: [PATCH] [fix][index] Fix the bug that diskann build pq data exceeds 2^31 squares. And optimize the error reporting time. It is only found on the index side. --- dingo-store-proto | 2 +- src/common/constant.h | 2 +- src/diskann/diskann_item.cc | 50 +++++++++++++++++++++ src/diskann/diskann_item.h | 2 + src/diskann/diskann_service_handle.cc | 24 ++++++++++ src/diskann/diskann_service_handle.h | 2 + src/server/diskann_service.cc | 54 +++++++++++++++++++++++ src/server/diskann_service.h | 5 +++ src/vector/vector_index_diskann.cc | 63 +++++++++++++++++++++++++++ src/vector/vector_index_diskann.h | 3 ++ 10 files changed, 205 insertions(+), 2 deletions(-) diff --git a/dingo-store-proto b/dingo-store-proto index 111645ab4..e0c9e6ddf 160000 --- a/dingo-store-proto +++ b/dingo-store-proto @@ -1 +1 @@ -Subproject commit 111645ab4121fcd057c56865675e3f885de48d9c +Subproject commit e0c9e6ddf6ef44b5915e09408424621afb03ec51 diff --git a/src/common/constant.h b/src/common/constant.h index 0ec109b4e..3458c1536 100644 --- a/src/common/constant.h +++ b/src/common/constant.h @@ -243,7 +243,7 @@ class Constant { // do not change this parameter or it will crash. inline static const int64_t kDiskannMinCount = 2; // max count of vectors for diskann - inline static const int64_t kDiskannMaxCount = std::numeric_limits::max(); + inline static const int64_t kDiskannMaxCount = std::numeric_limits::max(); inline static const std::string kDiskannStore = "store"; inline static const std::string kDiskannPathConfigName = "path"; inline static const std::string kDiskannNumThreadsConfigName = "num_threads"; diff --git a/src/diskann/diskann_item.cc b/src/diskann/diskann_item.cc index c075a012a..5da8e6577 100644 --- a/src/diskann/diskann_item.cc +++ b/src/diskann/diskann_item.cc @@ -694,6 +694,56 @@ butil::Status DiskANNItem::SetNoData(std::shared_ptr ctx) { return butil::Status::OK(); } +butil::Status DiskANNItem::SetImportTooMany(std::shared_ptr ctx) { + DiskANNCoreState old_state; + butil::Status status; + BvarLatencyGuard bvar_guard(&g_diskann_server_nodata_latency); + RWLockWriteGuard guard(&rw_lock_); + + SetSide(ctx); + bool is_error_occurred = false; + old_state = state_.load(); + + auto lambda_set_state_function = [this, &is_error_occurred, &status, ctx]() { + if (is_error_occurred) { + last_error_ = status; + error_local_side_ = local_side_; + error_remote_side_ = remote_side_; + } + + ctx->SetStatus(last_error_); + ctx->SetDiskANNCoreStateX(state_); + }; + + ON_SCOPE_EXIT(lambda_set_state_function); + if (state_.load() == DiskANNCoreState::kImporting && + last_error_.error_code() == pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY) { + std::string s = fmt::format("diskann is set import too many.(kImporting). ignore."); + DINGO_LOG(INFO) << s; + status = butil::Status(pb::error::Errno::OK, s); + return status; + } + + if (!last_error_.ok()) { + is_error_occurred = true; + DINGO_LOG(ERROR) << "already error occurred, ignore set import too many. return." << last_error_.error_cstr(); + status = last_error_; + return status; + } + + if (state_.load() != DiskANNCoreState::kUnknown) { + std::string s = fmt::format("diskann item state wrong. {}", FormatParameter()); + DINGO_LOG(ERROR) << s; + status = butil::Status(pb::error::Errno::EDISKANN_IMPORT_STATE_WRONG, s); + return status; + } + + state_ = DiskANNCoreState::kImporting; + std::string s = fmt::format("diskann set import too many. > {}", Constant::kDiskannMaxCount); + last_error_ = butil::Status(pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY, s); + return butil::Status::OK(); +} + bool DiskANNItem::IsBuildedFilesExist(int64_t vector_index_id, pb::common::MetricType metric_type) { std::string data_bin_path = fmt::format("{}/{}/{}/{}", base_dir, normal_name, vector_index_id, input_name); #if defined(ENABLE_DISKANN_ID_MAPPING) diff --git a/src/diskann/diskann_item.h b/src/diskann/diskann_item.h index 085e0c301..d7e7cd980 100644 --- a/src/diskann/diskann_item.h +++ b/src/diskann/diskann_item.h @@ -71,6 +71,8 @@ class DiskANNItem : public std::enable_shared_from_this { std::string Dump(std::shared_ptr ctx); butil::Status Count(std::shared_ptr ctx, int64_t& count); // NOLINT butil::Status SetNoData(std::shared_ptr ctx); + butil::Status SetImportTooMany(std::shared_ptr ctx); + static void SetImportTimeout(int64_t timeout_s) { DiskANNItem::import_timeout_s = timeout_s; } static void SetBaseDir(const std::string& base) { DiskANNItem::base_dir = base; } diff --git a/src/diskann/diskann_service_handle.cc b/src/diskann/diskann_service_handle.cc index df75982c6..0d44a0f54 100644 --- a/src/diskann/diskann_service_handle.cc +++ b/src/diskann/diskann_service_handle.cc @@ -331,6 +331,30 @@ butil::Status DiskAnnServiceHandle::VectorSetNoData(std::shared_ptr ctx return status; } +butil::Status DiskAnnServiceHandle::VectorSetImportTooMany(std::shared_ptr ctx, int64_t vector_index_id) { + butil::Status status; + auto item = item_manager.Find(vector_index_id); + if (item == nullptr) { + std::string s = fmt::format("vector_index_id : {} not exists", vector_index_id); + DINGO_LOG(ERROR) << s; + status = butil::Status(pb::error::EINDEX_NOT_FOUND, s); + return status; + } + + status = item->SetImportTooMany(ctx); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + } + + pb::diskann::VectorSetImportTooManyResponse& response = + (dynamic_cast(*ctx->Response())); + + ServiceHelper::SetError(response.mutable_last_error(), ctx->Status().error_code(), ctx->Status().error_str()); + response.set_state(DiskANNUtils::DiskANNCoreStateToPb(ctx->DiskANNCoreStateX())); + + return status; +} + butil::Status DiskAnnServiceHandle::VectorDump(std::shared_ptr ctx, int64_t vector_index_id) { butil::Status status; auto item = item_manager.Find(vector_index_id); diff --git a/src/diskann/diskann_service_handle.h b/src/diskann/diskann_service_handle.h index ddaf1617d..efc87c94e 100644 --- a/src/diskann/diskann_service_handle.h +++ b/src/diskann/diskann_service_handle.h @@ -59,6 +59,8 @@ class DiskAnnServiceHandle { static butil::Status VectorSetNoData(std::shared_ptr ctx, int64_t vector_index_id); + static butil::Status VectorSetImportTooMany(std::shared_ptr ctx, int64_t vector_index_id); + static butil::Status VectorDump(std::shared_ptr ctx, int64_t vector_index_id); static butil::Status VectorDumpAll(std::shared_ptr ctx); diff --git a/src/server/diskann_service.cc b/src/server/diskann_service.cc index 512df1639..9f10abff8 100644 --- a/src/server/diskann_service.cc +++ b/src/server/diskann_service.cc @@ -692,6 +692,60 @@ void DiskAnnServiceImpl::VectorSetNoData(google::protobuf::RpcController* contro } } +static butil::Status ValidateVectorSetImportTooMany( + const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request) { + if (request->vector_index_id() <= 0) { + std::string s = fmt::format("Invalid vector index id : {}", request->vector_index_id()); + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, s); + } + + return butil::Status::OK(); +} + +static void DoVectorSetImportTooMany(std::shared_ptr handle, + google::protobuf::RpcController* controller, + const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request, + ::dingodb::pb::diskann::VectorSetImportTooManyResponse* response, + TrackClosure* done) { + brpc::Controller* cntl = (brpc::Controller*)controller; + brpc::ClosureGuard done_guard(done); + auto tracker = done->Tracker(); + tracker->SetServiceQueueWaitTime(); + + butil::Status status = ValidateVectorSetImportTooMany(request); + if (!status.ok()) { + ServiceHelper::SetError(response->mutable_error(), status.error_code(), status.error_str()); + return; + } + + auto ctx = std::make_shared(cntl, true ? nullptr : done_guard.release(), request, response); + ctx->SetTracker(tracker); + + status = handle->VectorSetImportTooMany(ctx, request->vector_index_id()); + if (!status.ok()) { + ServiceHelper::SetError(response->mutable_error(), status.error_code(), status.error_str()); + } +} + +void DiskAnnServiceImpl::VectorSetImportTooMany(google::protobuf::RpcController* controller, + const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request, + ::dingodb::pb::diskann::VectorSetImportTooManyResponse* response, + ::google::protobuf::Closure* done) { + auto* svr_done = new NoContextServiceClosure(__func__, done, request, response); + + // Run in queue. + auto task = std::make_shared([this, controller, request, response, svr_done]() { + DoVectorSetImportTooMany(handle_, controller, request, response, svr_done); + }); + + bool ret = DiskANNItemRuntime::GetMiscWorkerSet()->Execute(task); + if (!ret) { + brpc::ClosureGuard done_guard(svr_done); + ServiceHelper::SetError(response->mutable_error(), pb::error::EREQUEST_FULL, + "WorkerSet queue is full, please wait and retry"); + } +} + static butil::Status ValidateVectorDump(const ::dingodb::pb::diskann::VectorDumpRequest* request) { if (request->vector_index_id() <= 0) { std::string s = fmt::format("Invalid vector index id : {}", request->vector_index_id()); diff --git a/src/server/diskann_service.h b/src/server/diskann_service.h index 5f7cd620b..441e543a9 100644 --- a/src/server/diskann_service.h +++ b/src/server/diskann_service.h @@ -74,6 +74,11 @@ class DiskAnnServiceImpl : public pb::diskann::DiskAnnService { ::dingodb::pb::diskann::VectorSetNoDataResponse* response, ::google::protobuf::Closure* done) override; + void VectorSetImportTooMany(google::protobuf::RpcController* controller, + const ::dingodb::pb::diskann::VectorSetImportTooManyRequest* request, + ::dingodb::pb::diskann::VectorSetImportTooManyResponse* response, + ::google::protobuf::Closure* done) override; + void VectorDump(google::protobuf::RpcController* controller, const ::dingodb::pb::diskann::VectorDumpRequest* request, ::dingodb::pb::diskann::VectorDumpResponse* response, ::google::protobuf::Closure* done) override; diff --git a/src/vector/vector_index_diskann.cc b/src/vector/vector_index_diskann.cc index 08b144a69..aa44d2267 100644 --- a/src/vector/vector_index_diskann.cc +++ b/src/vector/vector_index_diskann.cc @@ -772,6 +772,20 @@ butil::Status VectorIndexDiskANN::DoBuild(const pb::common::Range& region_range, return butil::Status::OK(); } + // count too many, set error to diskann. + if (region_count > Constant::kDiskannMaxCount) { + std::string s = fmt::format("region : {} vector current count {} is more than max count {}. set error to diskann", + Id(), region_count, Constant::kDiskannMaxCount); + DINGO_LOG(WARNING) << s; + status = SendVectorSetImportTooManyRequestWrapper(); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } + state = pb::common::DiskANNCoreState::IMPORTING; + return butil::Status(pb::error::Errno::EDISKANN_IMPORT_COUNT_TOO_MANY, s); + } + auto iter = reader->NewIterator(Constant::kVectorDataCF, ts, options); if (!iter) { @@ -1224,6 +1238,55 @@ butil::Status VectorIndexDiskANN::SendVectorSetNoDataRequestWrapper() { return butil::Status::OK(); } +butil::Status VectorIndexDiskANN::SendVectorSetImportTooManyRequest(const google::protobuf::Message& request, + google::protobuf::Message& response) { + butil::Status status; + if (!InitChannel(diskann_server_addr)) { + std::string s = fmt::format("Init channel failed, addr : {}", diskann_server_addr); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::Errno::EINTERNAL, s); + } + + // count rpc + status = SendRequest("DiskAnnService", "VectorSetImportTooMany", request, response); + if (!status.ok()) { + std::string s = fmt::format("VectorSetImportTooMany request failed, errcode: {} errmsg: {}", + pb::error::Errno_Name(status.error_code()), status.error_cstr()); + DINGO_LOG(ERROR) << s; + return butil::Status(status.error_code(), s); + } + + return butil::Status::OK(); +} +butil::Status VectorIndexDiskANN::SendVectorSetImportTooManyRequestWrapper() { + butil::Status status; + + // count rpc + pb::diskann::VectorSetImportTooManyRequest vector_set_import_too_many_request; + pb::diskann::VectorSetImportTooManyResponse vector_set_import_too_many_response; + + vector_set_import_too_many_request.set_vector_index_id(Id()); + status = SendVectorSetImportTooManyRequest(vector_set_import_too_many_request, vector_set_import_too_many_response); + if (!status.ok()) { + std::string s = fmt::format("VectorSetImportTooMany request failed, errcode: {} errmsg: {}", + pb::error::Errno_Name(status.error_code()), status.error_cstr()); + DINGO_LOG(ERROR) << s; + return butil::Status(status.error_code(), s); + } + + if (vector_set_import_too_many_response.error().errcode() != pb::error::Errno::OK) { + std::string s = + fmt::format("VectorSetImportTooMany response error, errcode: {} errmsg: {} state: {}", + pb::error::Errno_Name(vector_set_import_too_many_response.error().errcode()), + vector_set_import_too_many_response.error().errmsg(), + pb::common::DiskANNCoreState_Name(vector_set_import_too_many_response.state())); // state + DINGO_LOG(ERROR) << s; + return butil::Status(vector_set_import_too_many_response.error().errcode(), s); + } + + return butil::Status::OK(); +} + butil::Status VectorIndexDiskANN::SendVectorBuildRequest(const google::protobuf::Message& request, google::protobuf::Message& response) { butil::Status status; diff --git a/src/vector/vector_index_diskann.h b/src/vector/vector_index_diskann.h index 2340586f2..8b1c24124 100644 --- a/src/vector/vector_index_diskann.h +++ b/src/vector/vector_index_diskann.h @@ -133,6 +133,9 @@ class VectorIndexDiskANN : public VectorIndex, public std::enable_shared_from_th butil::Status SendVectorSetNoDataRequest(const google::protobuf::Message& request, google::protobuf::Message& response); butil::Status SendVectorSetNoDataRequestWrapper(); + butil::Status SendVectorSetImportTooManyRequest(const google::protobuf::Message& request, + google::protobuf::Message& response); + butil::Status SendVectorSetImportTooManyRequestWrapper(); butil::Status SendVectorBuildRequest(const google::protobuf::Message& request, google::protobuf::Message& response); butil::Status SendVectorBuildRequestWrapper(const pb::common::VectorBuildParameter& parameter, pb::common::DiskANNCoreState& state);