Skip to content

Commit

Permalink
Add StreamARN parameter to support CAA
Browse files Browse the repository at this point in the history
  • Loading branch information
aakkem committed Dec 14, 2023
1 parent 9fe96cd commit 9063db5
Show file tree
Hide file tree
Showing 14 changed files with 634 additions and 117 deletions.
9 changes: 7 additions & 2 deletions aws/kinesis/core/kinesis_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,12 @@ void KinesisProducer::create_sts_client(const std::string& ca_path) {
cfg);
}

Pipeline* KinesisProducer::create_pipeline(const std::string& stream) {
Pipeline* KinesisProducer::create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn) {
LOG(info) << "Created pipeline for stream \"" << stream << "\"";
return new Pipeline(
region_,
stream,
stream_arn,
config_,
executor_,
kinesis_client_,
Expand Down Expand Up @@ -291,7 +292,11 @@ void KinesisProducer::on_put_record(aws::kinesis::protobuf::Message& m) {
std::chrono::milliseconds(config_->record_max_buffered_time()));
ur->set_expiration_from_now(
std::chrono::milliseconds(config_->record_ttl()));
pipelines_[ur->stream()].put(ur);
if (ur->stream_arn()) {
pipelines_[ur->stream_arn().get()].put(ur);
} else {
pipelines_[ur->stream()].put(ur);
}
}

void KinesisProducer::on_flush(const aws::kinesis::protobuf::Flush& flush_msg) {
Expand Down
12 changes: 9 additions & 3 deletions aws/kinesis/core/kinesis_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ class KinesisProducer : boost::noncopyable {
cw_creds_provider_(std::move(cw_creds_provider)),
executor_(std::move(executor)),
ipc_manager_(std::move(ipc_manager)),
pipelines_([this](auto& stream) {
return this->create_pipeline(stream);
pipelines_([this](auto& stream_or_arn) {
std::regex kinesisStreamArnRegex("^arn:aws.*:kinesis:.*:\\d{12}:stream/\\S+$");
std::smatch match;
if (std::regex_search(stream_or_arn, match, kinesisStreamArnRegex)) {
return this->create_pipeline(match[1].str(), stream_or_arn);
} else {
return this->create_pipeline(stream_or_arn, boost::none);
}
}),
shutdown_(false) {
create_kinesis_client(ca_path);
Expand Down Expand Up @@ -80,7 +86,7 @@ class KinesisProducer : boost::noncopyable {

void create_sts_client(const std::string& ca_path);

Pipeline* create_pipeline(const std::string& stream);
Pipeline* create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn);

void drain_messages();

Expand Down
9 changes: 7 additions & 2 deletions aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Pipeline : boost::noncopyable {
Pipeline(
std::string region,
std::string stream,
boost::optional<std::string> stream_arn,
std::shared_ptr<Configuration> config,
std::shared_ptr<aws::utils::Executor> executor,
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
Expand All @@ -58,7 +59,7 @@ class Pipeline : boost::noncopyable {
Retrier::UserRecordCallback finish_user_record_cb)
: stream_(std::move(stream)),
region_(std::move(region)),
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_))),
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_, stream_arn_))),
config_(std::move(config)),
stats_logger_(stream_, config_->record_max_buffered_time()),
executor_(std::move(executor)),
Expand Down Expand Up @@ -205,7 +206,11 @@ class Pipeline : boost::noncopyable {
// Retrieve the account ID and partition from the STS service.
static std::string init_stream_arn(const std::shared_ptr<Aws::STS::STSClient>& sts_client,
const std::string &region,
const std::string &stream_name) {
const std::string &stream_name,
const boost::optional<std::string> &stream_arn_) {
if (!stream_arn_) {
return stream_arn_.get();
}
Aws::STS::Model::GetCallerIdentityRequest request;
auto outcome = sts_client->GetCallerIdentity(request);
if (outcome.IsSuccess()) {
Expand Down
4 changes: 4 additions & 0 deletions aws/kinesis/core/user_record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ UserRecord::UserRecord(aws::kinesis::protobuf::Message& m)
source_id_ = m.id();
auto put_record = m.put_record();
stream_ = std::move(put_record.stream_name());
has_stream_arn_ = put_record.has_stream_arn();
if (has_stream_arn_) {
stream_arn_ = std::move(put_record.stream_arn());
}
partition_key_ = std::move(put_record.partition_key());
data_ = std::move(put_record.data());
has_explicit_hash_key_ = put_record.has_explicit_hash_key();
Expand Down
10 changes: 10 additions & 0 deletions aws/kinesis/core/user_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ class UserRecord : public aws::utils::TimeSensitive {
return ss.str();
}

boost::optional<std::string> stream_arn() const noexcept {
if (has_stream_arn_) {
return stream_arn_;
} else {
return boost::none;
}
}

boost::optional<std::string> explicit_hash_key() const noexcept {
if (has_explicit_hash_key_) {
return hash_key_decimal_str();
Expand All @@ -99,12 +107,14 @@ class UserRecord : public aws::utils::TimeSensitive {
private:
uint64_t source_id_;
std::string stream_;
std::string stream_arn_;
std::string partition_key_;
uint128_t hash_key_;
std::string data_;
std::vector<Attempt> attempts_;
boost::optional<uint64_t> predicted_shard_;
bool has_explicit_hash_key_;
bool has_stream_arn_;
bool finished_;
};

Expand Down
122 changes: 85 additions & 37 deletions aws/kinesis/protobuf/messages.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9063db5

Please sign in to comment.