Skip to content

Commit

Permalink
[feat][store] Implement scan v2 and new coprocessor. not test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Haijun Yu authored and ketor committed Jan 9, 2024
1 parent e4551e7 commit f84fea6
Show file tree
Hide file tree
Showing 39 changed files with 8,516 additions and 326 deletions.
5 changes: 5 additions & 0 deletions conf/store.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ store:
timeout_s: 1800
max_bytes_rpc: 4194304
max_fetch_cnt_by_server: 1000
scan_v2:
scan_interval_s: 30
timeout_s: 1800
max_bytes_rpc: 4194304
max_fetch_cnt_by_server: 1000
gc:
update_safe_point_interval_s: 60
do_gc_interval_s: 60
3 changes: 3 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ message Schema {

// index position starts from 0
int32 index = 4;

// field name only for scala , others invalid
string name = 5;
}

message CoprocessorV2 {
Expand Down
1 change: 1 addition & 0 deletions src/common/constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class Constant {

// scan config
inline static const std::string kStoreScan = "store.scan";
inline static const std::string kStoreScanV2 = "store.scan_v2";
inline static const std::string kStoreScanTimeoutS = "timeout_s";
inline static const std::string kStoreScanMaxBytesRpc = "max_bytes_rpc";
inline static const std::string kStoreScanMaxFetchCntByServer = "max_fetch_cnt_by_server";
Expand Down
38 changes: 34 additions & 4 deletions src/coprocessor/coprocessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

#include "coprocessor/coprocessor.h"

#include <algorithm>
#include <any>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <memory>
#include <optional>
#include <utility>
#include <vector>

#include "common/logging.h"
#include "coprocessor/raw_coprocessor.h"
#include "coprocessor/utils.h"
#include "fmt/core.h"
#include "proto/error.pb.h"
Expand All @@ -39,12 +39,27 @@ namespace dingodb {
Coprocessor::Coprocessor() : enable_expression_(true), end_of_group_by_(true) {}
Coprocessor::~Coprocessor() { Close(); }

butil::Status Coprocessor::Open(const pb::store::Coprocessor& coprocessor) {
butil::Status Coprocessor::Open(const std::any& coprocessor) {
butil::Status status;

DINGO_LOG(DEBUG) << fmt::format("Coprocessor::Open Enter");

coprocessor_ = coprocessor;
try {
const CoprocessorPbWrapper& coprocessor_pb_wrapper = std::any_cast<const CoprocessorPbWrapper&>(coprocessor);

const pb::store::Coprocessor* coprocessor_v1 = std::get_if<pb::store::Coprocessor>(&coprocessor_pb_wrapper);
if (nullptr == coprocessor_v1) {
std::string error_message =
fmt::format("EXCEPTION from coprocessor_pb_wrapper trans pb::store::Coprocessor failed");
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
coprocessor_ = *coprocessor_v1;
} catch (std::bad_any_cast& e) {
std::string error_message = fmt::format("EXCEPTION : {} trans pb::store::Coprocessor failed", e.what());
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

Utils::DebugCoprocessor(coprocessor_);

Expand Down Expand Up @@ -231,6 +246,21 @@ butil::Status Coprocessor::Execute(IteratorPtr iter, bool key_only, size_t max_f

return status;
}

butil::Status Coprocessor::Execute(TxnIteratorPtr iter, int64_t limit, bool key_only, bool is_reverse,
pb::store::TxnResultInfo& txn_result_info, std::vector<pb::common::KeyValue>& kvs,
bool& has_more, std::string& end_key) {
return RawCoprocessor::Execute(iter, limit, key_only, is_reverse, txn_result_info, kvs, has_more, end_key);
}

butil::Status Coprocessor::Filter(const std::string& key, const std::string& value, bool& is_reserved) {
return RawCoprocessor::Filter(key, value, is_reserved);
}

butil::Status Coprocessor::Filter(const pb::common::VectorScalardata& scalar_data, bool& is_reserved) {
return RawCoprocessor::Filter(scalar_data, is_reserved);
}

butil::Status Coprocessor::DoExecute(const pb::common::KeyValue& kv, bool* has_result_kv,
pb::common::KeyValue* result_kv) {
butil::Status status;
Expand Down
21 changes: 16 additions & 5 deletions src/coprocessor/coprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,38 @@

#include "butil/status.h"
#include "coprocessor/aggregation_manager.h"
#include "coprocessor/raw_coprocessor.h"
#include "engine/iterator.h"
#include "proto/store.pb.h"
#include "scan/scan_filter.h"

namespace dingodb {

class Coprocessor {
class Coprocessor : public RawCoprocessor {
public:
Coprocessor();
~Coprocessor();
~Coprocessor() override;

Coprocessor(const Coprocessor& rhs) = delete;
Coprocessor& operator=(const Coprocessor& rhs) = delete;
Coprocessor(Coprocessor&& rhs) = delete;
Coprocessor& operator=(Coprocessor&& rhs) = delete;

butil::Status Open(const pb::store::Coprocessor& coprocessor);
// coprocessor = CoprocessorPbWrapper
butil::Status Open(const std::any& coprocessor) override;

butil::Status Execute(IteratorPtr iter, bool key_only, size_t max_fetch_cnt, int64_t max_bytes_rpc,
std::vector<pb::common::KeyValue>* kvs);
void Close();
std::vector<pb::common::KeyValue>* kvs) override;

butil::Status Execute(TxnIteratorPtr iter, int64_t limit, bool key_only, bool is_reverse,
pb::store::TxnResultInfo& txn_result_info, std::vector<pb::common::KeyValue>& kvs, // NOLINT
bool& has_more, std::string& end_key) override; // NOLINT

butil::Status Filter(const std::string& key, const std::string& value, bool& is_reserved) override; // NOLINT

butil::Status Filter(const pb::common::VectorScalardata& scalar_data, bool& is_reserved) override; // NOLINT

void Close() override;

private:
butil::Status DoExecute(const pb::common::KeyValue& kv, bool* has_result_kv, pb::common::KeyValue* result_kv);
Expand Down
240 changes: 240 additions & 0 deletions src/coprocessor/coprocessor_scalar.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "coprocessor/coprocessor_scalar.h"

#include <any>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "common/logging.h"
#include "fmt/core.h"
#include "proto/error.pb.h"
#include "proto/meta.pb.h"
#include "proto/store.pb.h"
#include "serial/schema/base_schema.h"

namespace dingodb {

CoprocessorScalar::CoprocessorScalar() = default;
CoprocessorScalar::~CoprocessorScalar() { Close(); }

butil::Status CoprocessorScalar::Open(const std::any& coprocessor) { return CoprocessorV2::Open(coprocessor); }

butil::Status CoprocessorScalar::Execute(IteratorPtr iter, bool key_only, size_t max_fetch_cnt, int64_t max_bytes_rpc,
std::vector<pb::common::KeyValue>* kvs) {
return CoprocessorV2::RawCoprocessor::Execute(iter, key_only, max_fetch_cnt, max_bytes_rpc, kvs); // NOLINT
}

butil::Status CoprocessorScalar::Execute(TxnIteratorPtr iter, int64_t limit, bool key_only, bool is_reverse,
pb::store::TxnResultInfo& txn_result_info,
std::vector<pb::common::KeyValue>& kvs, bool& has_more, std::string& end_key) {
return CoprocessorV2::RawCoprocessor::Execute(iter, limit, key_only, is_reverse, txn_result_info, kvs, // NOLINT
has_more, end_key);
}

butil::Status CoprocessorScalar::Filter(const std::string& key, const std::string& value, bool& is_reserved) {
return CoprocessorV2::RawCoprocessor::Filter(key, value, is_reserved); // NOLINT
}

butil::Status CoprocessorScalar::Filter(const pb::common::VectorScalardata& scalar_data, bool& is_reserved) {
butil::Status status;

std::vector<std::any> original_record;
original_record.reserve(selection_column_indexes_.size());

status = TransToAnyRecord(scalar_data, original_record);
if (!status.ok()) {
DINGO_LOG(ERROR) << status.error_cstr();
return status;
}

std::unique_ptr<std::vector<expr::Operand>> result_operand_ptr;

status = DoRelExprCore(original_record, result_operand_ptr);
if (!status.ok()) {
DINGO_LOG(ERROR) << status.error_cstr();
return status;
}

if (!result_operand_ptr) {
is_reserved = false;
return butil::Status();
}

is_reserved = true;

return butil::Status();
}

void CoprocessorScalar::Close() { return CoprocessorV2::Close(); }

butil::Status CoprocessorScalar::TransToAnyRecord(const pb::common::VectorScalardata& scalar_data,
std::vector<std::any>& original_record) {
for (int selection_column_index : selection_column_indexes_) {
auto original_serial_schema = (*original_serial_schemas_)[selection_column_indexes_[selection_column_index]];
BaseSchema::Type type = original_serial_schema->GetType();
const std::string& name = original_serial_schema->GetName();

auto iter = scalar_data.scalar_data().find(name);
if (iter == scalar_data.scalar_data().end()) {
std::string error_message = fmt::format("in scalar_data not find name : {}", name);
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

const auto& scalar_value = iter->second;
pb::common::ScalarFieldType field_type = scalar_value.field_type();

switch (field_type) {
case pb::common::BOOL: {
if (BaseSchema::Type::kBool != type) {
std::string error_message =
fmt::format("field name : {} type not match. schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

if (scalar_value.fields().empty()) {
std::string error_message = fmt::format("name : {} field_type : {} scalar_value.fields() empty", name,
pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
original_record.emplace_back(std::optional<bool>{scalar_value.fields(0).bool_data()});
break;
}

case pb::common::INT32: {
if (BaseSchema::Type::kInteger != type) {
std::string error_message =
fmt::format("field name : {} type not match. schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

if (scalar_value.fields().empty()) {
std::string error_message = fmt::format("name : {} field_type : {} scalar_value.fields() empty", name,
pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
original_record.emplace_back(std::optional<int32_t>{scalar_value.fields(0).int_data()});
break;
}
case pb::common::INT64: {
if (BaseSchema::Type::kLong != type) {
std::string error_message =
fmt::format("field name : {} type not match. schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

if (scalar_value.fields().empty()) {
std::string error_message = fmt::format("name : {} field_type : {} scalar_value.fields() empty", name,
pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
original_record.emplace_back(std::optional<int64_t>{scalar_value.fields(0).long_data()});
break;
}
case pb::common::FLOAT32: {
if (BaseSchema::Type::kFloat != type) {
std::string error_message =
fmt::format("field name : {} type not match. schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

if (scalar_value.fields().empty()) {
std::string error_message = fmt::format("name : {} field_type : {} scalar_value.fields() empty", name,
pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
original_record.emplace_back(std::optional<float>{scalar_value.fields(0).float_data()});
break;
}
case pb::common::DOUBLE: {
if (BaseSchema::Type::kDouble != type) {
std::string error_message =
fmt::format("field name : {} type not match. schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

if (scalar_value.fields().empty()) {
std::string error_message = fmt::format("name : {} field_type : {} scalar_value.fields() empty", name,
pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
original_record.emplace_back(std::optional<double>{scalar_value.fields(0).double_data()});
break;
}
case pb::common::STRING: {
if (BaseSchema::Type::kString != type) {
std::string error_message =
fmt::format("field name : {} type not match. schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}

if (scalar_value.fields().empty()) {
std::string error_message = fmt::format("name : {} field_type : {} scalar_value.fields() empty", name,
pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
original_record.emplace_back(std::optional<std::shared_ptr<std::string>>{
std::make_shared<std::string>(scalar_value.fields(0).string_data())});
break;
}

case pb::common::INT8:
[[fallthrough]];
case pb::common::INT16:
[[fallthrough]];
case pb::common::BYTES:
[[fallthrough]];
case pb::common::NONE:
[[fallthrough]];
case pb::common::ScalarFieldType_INT_MIN_SENTINEL_DO_NOT_USE_:
[[fallthrough]];
case pb::common::ScalarFieldType_INT_MAX_SENTINEL_DO_NOT_USE_:
[[fallthrough]];
default: {
std::string error_message =
fmt::format("field name : {} not support . schema type : {} field_type : {}", name,
BaseSchema::GetTypeString(type), pb::common::ScalarFieldType_Name(field_type));
DINGO_LOG(ERROR) << error_message;
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message);
}
}
}

return butil::Status();
}

} // namespace dingodb
Loading

0 comments on commit f84fea6

Please sign in to comment.