Skip to content

Commit

Permalink
Revert "Add StreamARN parameter to support CAA"
Browse files Browse the repository at this point in the history
This reverts commit 9063db5.
  • Loading branch information
aakkem committed Mar 6, 2024
1 parent 59e431d commit 1caa983
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 634 deletions.
9 changes: 2 additions & 7 deletions aws/kinesis/core/kinesis_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,11 @@ void KinesisProducer::create_sts_client(const std::string& ca_path) {
cfg);
}

Pipeline* KinesisProducer::create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn) {
Pipeline* KinesisProducer::create_pipeline(const std::string& stream) {
LOG(info) << "Created pipeline for stream \"" << stream << "\"";
return new Pipeline(
region_,
stream,
stream_arn,
config_,
executor_,
kinesis_client_,
Expand Down Expand Up @@ -292,11 +291,7 @@ void KinesisProducer::on_put_record(aws::kinesis::protobuf::Message& m) {
std::chrono::milliseconds(config_->record_max_buffered_time()));
ur->set_expiration_from_now(
std::chrono::milliseconds(config_->record_ttl()));
if (ur->stream_arn()) {
pipelines_[ur->stream_arn().get()].put(ur);
} else {
pipelines_[ur->stream()].put(ur);
}
pipelines_[ur->stream()].put(ur);
}

void KinesisProducer::on_flush(const aws::kinesis::protobuf::Flush& flush_msg) {
Expand Down
12 changes: 3 additions & 9 deletions aws/kinesis/core/kinesis_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,8 @@ class KinesisProducer : boost::noncopyable {
cw_creds_provider_(std::move(cw_creds_provider)),
executor_(std::move(executor)),
ipc_manager_(std::move(ipc_manager)),
pipelines_([this](auto& stream_or_arn) {
std::regex kinesisStreamArnRegex("^arn:aws.*:kinesis:.*:\\d{12}:stream/\\S+$");
std::smatch match;
if (std::regex_search(stream_or_arn, match, kinesisStreamArnRegex)) {
return this->create_pipeline(match[1].str(), stream_or_arn);
} else {
return this->create_pipeline(stream_or_arn, boost::none);
}
pipelines_([this](auto& stream) {
return this->create_pipeline(stream);
}),
shutdown_(false) {
create_kinesis_client(ca_path);
Expand Down Expand Up @@ -86,7 +80,7 @@ class KinesisProducer : boost::noncopyable {

void create_sts_client(const std::string& ca_path);

Pipeline* create_pipeline(const std::string& stream, const boost::optional<std::string>& stream_arn);
Pipeline* create_pipeline(const std::string& stream);

void drain_messages();

Expand Down
9 changes: 2 additions & 7 deletions aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class Pipeline : boost::noncopyable {
Pipeline(
std::string region,
std::string stream,
boost::optional<std::string> stream_arn,
std::shared_ptr<Configuration> config,
std::shared_ptr<aws::utils::Executor> executor,
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
Expand All @@ -59,7 +58,7 @@ class Pipeline : boost::noncopyable {
Retrier::UserRecordCallback finish_user_record_cb)
: stream_(std::move(stream)),
region_(std::move(region)),
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_, stream_arn_))),
stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_))),
config_(std::move(config)),
stats_logger_(stream_, config_->record_max_buffered_time()),
executor_(std::move(executor)),
Expand Down Expand Up @@ -206,11 +205,7 @@ class Pipeline : boost::noncopyable {
// Retrieve the account ID and partition from the STS service.
static std::string init_stream_arn(const std::shared_ptr<Aws::STS::STSClient>& sts_client,
const std::string &region,
const std::string &stream_name,
const boost::optional<std::string> &stream_arn_) {
if (!stream_arn_) {
return stream_arn_.get();
}
const std::string &stream_name) {
Aws::STS::Model::GetCallerIdentityRequest request;
auto outcome = sts_client->GetCallerIdentity(request);
if (outcome.IsSuccess()) {
Expand Down
4 changes: 0 additions & 4 deletions aws/kinesis/core/user_record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ UserRecord::UserRecord(aws::kinesis::protobuf::Message& m)
source_id_ = m.id();
auto put_record = m.put_record();
stream_ = std::move(put_record.stream_name());
has_stream_arn_ = put_record.has_stream_arn();
if (has_stream_arn_) {
stream_arn_ = std::move(put_record.stream_arn());
}
partition_key_ = std::move(put_record.partition_key());
data_ = std::move(put_record.data());
has_explicit_hash_key_ = put_record.has_explicit_hash_key();
Expand Down
10 changes: 0 additions & 10 deletions aws/kinesis/core/user_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ class UserRecord : public aws::utils::TimeSensitive {
return ss.str();
}

boost::optional<std::string> stream_arn() const noexcept {
if (has_stream_arn_) {
return stream_arn_;
} else {
return boost::none;
}
}

boost::optional<std::string> explicit_hash_key() const noexcept {
if (has_explicit_hash_key_) {
return hash_key_decimal_str();
Expand All @@ -107,14 +99,12 @@ class UserRecord : public aws::utils::TimeSensitive {
private:
uint64_t source_id_;
std::string stream_;
std::string stream_arn_;
std::string partition_key_;
uint128_t hash_key_;
std::string data_;
std::vector<Attempt> attempts_;
boost::optional<uint64_t> predicted_shard_;
bool has_explicit_hash_key_;
bool has_stream_arn_;
bool finished_;
};

Expand Down
122 changes: 37 additions & 85 deletions aws/kinesis/protobuf/messages.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1caa983

Please sign in to comment.