From 9063db589a263c72d5f9f756be624a07d49da833 Mon Sep 17 00:00:00 2001 From: Abhishek Yadav Akkem Date: Fri, 10 Nov 2023 15:55:31 -0800 Subject: [PATCH] Add StreamARN parameter to support CAA --- aws/kinesis/core/kinesis_producer.cc | 9 +- aws/kinesis/core/kinesis_producer.h | 12 +- aws/kinesis/core/pipeline.h | 9 +- aws/kinesis/core/user_record.cc | 4 + aws/kinesis/core/user_record.h | 10 + aws/kinesis/protobuf/messages.pb.cc | 122 +++++++--- aws/kinesis/protobuf/messages.pb.h | 93 +++++++ aws/kinesis/protobuf/messages.proto | 1 + .../kinesis/producer/IKinesisProducer.java | 6 +- .../kinesis/producer/KinesisProducer.java | 144 ++++++++++- .../services/kinesis/producer/UserRecord.java | 27 ++- .../kinesis/producer/protobuf/Config.java | 80 +++--- .../kinesis/producer/protobuf/Messages.java | 227 ++++++++++++++++-- .../kinesis/producer/KinesisProducerTest.java | 7 +- 14 files changed, 634 insertions(+), 117 deletions(-) diff --git a/aws/kinesis/core/kinesis_producer.cc b/aws/kinesis/core/kinesis_producer.cc index 3ac7e3e6..75447000 100644 --- a/aws/kinesis/core/kinesis_producer.cc +++ b/aws/kinesis/core/kinesis_producer.cc @@ -222,11 +222,12 @@ void KinesisProducer::create_sts_client(const std::string& ca_path) { cfg); } -Pipeline* KinesisProducer::create_pipeline(const std::string& stream) { +Pipeline* KinesisProducer::create_pipeline(const std::string& stream, const boost::optional& stream_arn) { LOG(info) << "Created pipeline for stream \"" << stream << "\""; return new Pipeline( region_, stream, + stream_arn, config_, executor_, kinesis_client_, @@ -291,7 +292,11 @@ void KinesisProducer::on_put_record(aws::kinesis::protobuf::Message& m) { std::chrono::milliseconds(config_->record_max_buffered_time())); ur->set_expiration_from_now( std::chrono::milliseconds(config_->record_ttl())); - pipelines_[ur->stream()].put(ur); + if (ur->stream_arn()) { + pipelines_[ur->stream_arn().get()].put(ur); + } else { + pipelines_[ur->stream()].put(ur); + } } void KinesisProducer::on_flush(const aws::kinesis::protobuf::Flush& flush_msg) { diff --git a/aws/kinesis/core/kinesis_producer.h b/aws/kinesis/core/kinesis_producer.h index 98bf4f5e..267ab4fa 100644 --- a/aws/kinesis/core/kinesis_producer.h +++ b/aws/kinesis/core/kinesis_producer.h @@ -46,8 +46,14 @@ class KinesisProducer : boost::noncopyable { cw_creds_provider_(std::move(cw_creds_provider)), executor_(std::move(executor)), ipc_manager_(std::move(ipc_manager)), - pipelines_([this](auto& stream) { - return this->create_pipeline(stream); + pipelines_([this](auto& stream_or_arn) { + std::regex kinesisStreamArnRegex("^arn:aws.*:kinesis:.*:\\d{12}:stream/\\S+$"); + std::smatch match; + if (std::regex_search(stream_or_arn, match, kinesisStreamArnRegex)) { + return this->create_pipeline(match[1].str(), stream_or_arn); + } else { + return this->create_pipeline(stream_or_arn, boost::none); + } }), shutdown_(false) { create_kinesis_client(ca_path); @@ -80,7 +86,7 @@ class KinesisProducer : boost::noncopyable { void create_sts_client(const std::string& ca_path); - Pipeline* create_pipeline(const std::string& stream); + Pipeline* create_pipeline(const std::string& stream, const boost::optional& stream_arn); void drain_messages(); diff --git a/aws/kinesis/core/pipeline.h b/aws/kinesis/core/pipeline.h index f4f3fe1e..fd7a0770 100644 --- a/aws/kinesis/core/pipeline.h +++ b/aws/kinesis/core/pipeline.h @@ -50,6 +50,7 @@ class Pipeline : boost::noncopyable { Pipeline( std::string region, std::string stream, + boost::optional stream_arn, std::shared_ptr config, std::shared_ptr executor, std::shared_ptr kinesis_client, @@ -58,7 +59,7 @@ class Pipeline : boost::noncopyable { Retrier::UserRecordCallback finish_user_record_cb) : stream_(std::move(stream)), region_(std::move(region)), - stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_))), + stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_, stream_arn_))), config_(std::move(config)), stats_logger_(stream_, config_->record_max_buffered_time()), executor_(std::move(executor)), @@ -205,7 +206,11 @@ class Pipeline : boost::noncopyable { // Retrieve the account ID and partition from the STS service. static std::string init_stream_arn(const std::shared_ptr& sts_client, const std::string ®ion, - const std::string &stream_name) { + const std::string &stream_name, + const boost::optional &stream_arn_) { + if (!stream_arn_) { + return stream_arn_.get(); + } Aws::STS::Model::GetCallerIdentityRequest request; auto outcome = sts_client->GetCallerIdentity(request); if (outcome.IsSuccess()) { diff --git a/aws/kinesis/core/user_record.cc b/aws/kinesis/core/user_record.cc index ce596414..f72c5456 100644 --- a/aws/kinesis/core/user_record.cc +++ b/aws/kinesis/core/user_record.cc @@ -31,6 +31,10 @@ UserRecord::UserRecord(aws::kinesis::protobuf::Message& m) source_id_ = m.id(); auto put_record = m.put_record(); stream_ = std::move(put_record.stream_name()); + has_stream_arn_ = put_record.has_stream_arn(); + if (has_stream_arn_) { + stream_arn_ = std::move(put_record.stream_arn()); + } partition_key_ = std::move(put_record.partition_key()); data_ = std::move(put_record.data()); has_explicit_hash_key_ = put_record.has_explicit_hash_key(); diff --git a/aws/kinesis/core/user_record.h b/aws/kinesis/core/user_record.h index f304bc92..9b4d8e93 100644 --- a/aws/kinesis/core/user_record.h +++ b/aws/kinesis/core/user_record.h @@ -84,6 +84,14 @@ class UserRecord : public aws::utils::TimeSensitive { return ss.str(); } + boost::optional stream_arn() const noexcept { + if (has_stream_arn_) { + return stream_arn_; + } else { + return boost::none; + } + } + boost::optional explicit_hash_key() const noexcept { if (has_explicit_hash_key_) { return hash_key_decimal_str(); @@ -99,12 +107,14 @@ class UserRecord : public aws::utils::TimeSensitive { private: uint64_t source_id_; std::string stream_; + std::string stream_arn_; std::string partition_key_; uint128_t hash_key_; std::string data_; std::vector attempts_; boost::optional predicted_shard_; bool has_explicit_hash_key_; + bool has_stream_arn_; bool finished_; }; diff --git a/aws/kinesis/protobuf/messages.pb.cc b/aws/kinesis/protobuf/messages.pb.cc index 856d7c63..4389c024 100644 --- a/aws/kinesis/protobuf/messages.pb.cc +++ b/aws/kinesis/protobuf/messages.pb.cc @@ -396,10 +396,12 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_messages_2eproto::offsets[] PR PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, partition_key_), PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, explicit_hash_key_), PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, data_), + PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, stream_arn_), 0, 1, 2, 3, + 4, PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::Flush, _has_bits_), PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::Flush, _internal_metadata_), ~0u, // no _extensions_ @@ -514,17 +516,17 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOB { 9, 18, sizeof(::aws::kinesis::protobuf::Record)}, { 22, 30, sizeof(::aws::kinesis::protobuf::AggregatedRecord)}, { 33, 48, sizeof(::aws::kinesis::protobuf::Message)}, - { 57, 66, sizeof(::aws::kinesis::protobuf::PutRecord)}, - { 70, 76, sizeof(::aws::kinesis::protobuf::Flush)}, - { 77, 87, sizeof(::aws::kinesis::protobuf::Attempt)}, - { 92, 101, sizeof(::aws::kinesis::protobuf::PutRecordResult)}, - { 105, 113, sizeof(::aws::kinesis::protobuf::Credentials)}, - { 116, 123, sizeof(::aws::kinesis::protobuf::SetCredentials)}, - { 125, 132, sizeof(::aws::kinesis::protobuf::Dimension)}, - { 134, 144, sizeof(::aws::kinesis::protobuf::Stats)}, - { 149, 158, sizeof(::aws::kinesis::protobuf::Metric)}, - { 162, 169, sizeof(::aws::kinesis::protobuf::MetricsRequest)}, - { 171, 177, sizeof(::aws::kinesis::protobuf::MetricsResponse)}, + { 57, 67, sizeof(::aws::kinesis::protobuf::PutRecord)}, + { 72, 78, sizeof(::aws::kinesis::protobuf::Flush)}, + { 79, 89, sizeof(::aws::kinesis::protobuf::Attempt)}, + { 94, 103, sizeof(::aws::kinesis::protobuf::PutRecordResult)}, + { 107, 115, sizeof(::aws::kinesis::protobuf::Credentials)}, + { 118, 125, sizeof(::aws::kinesis::protobuf::SetCredentials)}, + { 127, 134, sizeof(::aws::kinesis::protobuf::Dimension)}, + { 136, 146, sizeof(::aws::kinesis::protobuf::Stats)}, + { 151, 160, sizeof(::aws::kinesis::protobuf::Metric)}, + { 164, 171, sizeof(::aws::kinesis::protobuf::MetricsRequest)}, + { 173, 179, sizeof(::aws::kinesis::protobuf::MetricsResponse)}, }; static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { @@ -566,31 +568,31 @@ const char descriptor_table_protodef_messages_2eproto[] PROTOBUF_SECTION_VARIABL "H\000\022A\n\020metrics_response\030\010 \001(\0132%.aws.kines" "is.protobuf.MetricsResponseH\000\022\?\n\017set_cre" "dentials\030\t \001(\0132$.aws.kinesis.protobuf.Se" - "tCredentialsH\000B\020\n\016actual_message\"`\n\tPutR" + "tCredentialsH\000B\020\n\016actual_message\"t\n\tPutR" "ecord\022\023\n\013stream_name\030\001 \002(\t\022\025\n\rpartition_" "key\030\002 \002(\t\022\031\n\021explicit_hash_key\030\003 \001(\t\022\014\n\004" - "data\030\004 \002(\014\"\034\n\005Flush\022\023\n\013stream_name\030\001 \001(\t" - "\"f\n\007Attempt\022\r\n\005delay\030\001 \002(\r\022\020\n\010duration\030\002" - " \002(\r\022\017\n\007success\030\003 \002(\010\022\022\n\nerror_code\030\004 \001(" - "\t\022\025\n\rerror_message\030\005 \001(\t\"~\n\017PutRecordRes" - "ult\022/\n\010attempts\030\001 \003(\0132\035.aws.kinesis.prot" - "obuf.Attempt\022\017\n\007success\030\002 \002(\010\022\020\n\010shard_i" - "d\030\003 \001(\t\022\027\n\017sequence_number\030\004 \001(\t\">\n\013Cred" - "entials\022\014\n\004akid\030\001 \002(\t\022\022\n\nsecret_key\030\002 \002(" - "\t\022\r\n\005token\030\003 \001(\t\"]\n\016SetCredentials\022\023\n\013fo" - "r_metrics\030\001 \001(\010\0226\n\013credentials\030\002 \002(\0132!.a" - "ws.kinesis.protobuf.Credentials\"\'\n\tDimen" - "sion\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"K\n\005Stat" - "s\022\r\n\005count\030\001 \002(\001\022\013\n\003sum\030\002 \002(\001\022\014\n\004mean\030\003 " - "\002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003max\030\005 \002(\001\"\210\001\n\006Metric" - "\022\014\n\004name\030\001 \002(\t\0223\n\ndimensions\030\002 \003(\0132\037.aws" - ".kinesis.protobuf.Dimension\022*\n\005stats\030\003 \002" - "(\0132\033.aws.kinesis.protobuf.Stats\022\017\n\007secon" - "ds\030\004 \002(\004\"/\n\016MetricsRequest\022\014\n\004name\030\001 \001(\t" - "\022\017\n\007seconds\030\002 \001(\004\"@\n\017MetricsResponse\022-\n\007" - "metrics\030\001 \003(\0132\034.aws.kinesis.protobuf.Met" - "ricB2\n0com.amazonaws.services.kinesis.pr" - "oducer.protobuf" + "data\030\004 \002(\014\022\022\n\nstream_arn\030\005 \001(\t\"\034\n\005Flush\022" + "\023\n\013stream_name\030\001 \001(\t\"f\n\007Attempt\022\r\n\005delay" + "\030\001 \002(\r\022\020\n\010duration\030\002 \002(\r\022\017\n\007success\030\003 \002(" + "\010\022\022\n\nerror_code\030\004 \001(\t\022\025\n\rerror_message\030\005" + " \001(\t\"~\n\017PutRecordResult\022/\n\010attempts\030\001 \003(" + "\0132\035.aws.kinesis.protobuf.Attempt\022\017\n\007succ" + "ess\030\002 \002(\010\022\020\n\010shard_id\030\003 \001(\t\022\027\n\017sequence_" + "number\030\004 \001(\t\">\n\013Credentials\022\014\n\004akid\030\001 \002(" + "\t\022\022\n\nsecret_key\030\002 \002(\t\022\r\n\005token\030\003 \001(\t\"]\n\016" + "SetCredentials\022\023\n\013for_metrics\030\001 \001(\010\0226\n\013c" + "redentials\030\002 \002(\0132!.aws.kinesis.protobuf." + "Credentials\"\'\n\tDimension\022\013\n\003key\030\001 \002(\t\022\r\n" + "\005value\030\002 \002(\t\"K\n\005Stats\022\r\n\005count\030\001 \002(\001\022\013\n\003" + "sum\030\002 \002(\001\022\014\n\004mean\030\003 \002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003" + "max\030\005 \002(\001\"\210\001\n\006Metric\022\014\n\004name\030\001 \002(\t\0223\n\ndi" + "mensions\030\002 \003(\0132\037.aws.kinesis.protobuf.Di" + "mension\022*\n\005stats\030\003 \002(\0132\033.aws.kinesis.pro" + "tobuf.Stats\022\017\n\007seconds\030\004 \002(\004\"/\n\016MetricsR" + "equest\022\014\n\004name\030\001 \001(\t\022\017\n\007seconds\030\002 \001(\004\"@\n" + "\017MetricsResponse\022-\n\007metrics\030\001 \003(\0132\034.aws." + "kinesis.protobuf.MetricB2\n0com.amazonaws" + ".services.kinesis.producer.protobuf" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_messages_2eproto_deps[1] = { &::descriptor_table_config_2eproto, @@ -615,7 +617,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_mes static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_messages_2eproto_once; static bool descriptor_table_messages_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_messages_2eproto = { - &descriptor_table_messages_2eproto_initialized, descriptor_table_protodef_messages_2eproto, "messages.proto", 1775, + &descriptor_table_messages_2eproto_initialized, descriptor_table_protodef_messages_2eproto, "messages.proto", 1795, &descriptor_table_messages_2eproto_once, descriptor_table_messages_2eproto_sccs, descriptor_table_messages_2eproto_deps, 15, 1, schemas, file_default_instances, TableStruct_messages_2eproto::offsets, file_level_metadata_messages_2eproto, 15, file_level_enum_descriptors_messages_2eproto, file_level_service_descriptors_messages_2eproto, @@ -2218,6 +2220,9 @@ class PutRecord::_Internal { static void set_has_data(HasBits* has_bits) { (*has_bits)[0] |= 8u; } + static void set_has_stream_arn(HasBits* has_bits) { + (*has_bits)[0] |= 16u; + } }; PutRecord::PutRecord() @@ -2246,6 +2251,10 @@ PutRecord::PutRecord(const PutRecord& from) if (from._internal_has_data()) { data_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.data_); } + stream_arn_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); + if (from._internal_has_stream_arn()) { + stream_arn_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.stream_arn_); + } // @@protoc_insertion_point(copy_constructor:aws.kinesis.protobuf.PutRecord) } @@ -2255,6 +2264,7 @@ void PutRecord::SharedCtor() { partition_key_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); explicit_hash_key_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); data_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); + stream_arn_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } PutRecord::~PutRecord() { @@ -2267,6 +2277,7 @@ void PutRecord::SharedDtor() { partition_key_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); explicit_hash_key_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); data_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); + stream_arn_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } void PutRecord::SetCachedSize(int size) const { @@ -2285,7 +2296,7 @@ void PutRecord::Clear() { (void) cached_has_bits; cached_has_bits = _has_bits_[0]; - if (cached_has_bits & 0x0000000fu) { + if (cached_has_bits & 0x0000001fu) { if (cached_has_bits & 0x00000001u) { stream_name_.ClearNonDefaultToEmptyNoArena(); } @@ -2298,6 +2309,9 @@ void PutRecord::Clear() { if (cached_has_bits & 0x00000008u) { data_.ClearNonDefaultToEmptyNoArena(); } + if (cached_has_bits & 0x00000010u) { + stream_arn_.ClearNonDefaultToEmptyNoArena(); + } } _has_bits_.Clear(); _internal_metadata_.Clear(); @@ -2352,6 +2366,17 @@ const char* PutRecord::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID:: CHK_(ptr); } else goto handle_unusual; continue; + // optional string stream_arn = 5; + case 5: + if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 42)) { + auto str = _internal_mutable_stream_arn(); + ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParser(str, ptr, ctx); + #ifndef NDEBUG + ::PROTOBUF_NAMESPACE_ID::internal::VerifyUTF8(str, "aws.kinesis.protobuf.PutRecord.stream_arn"); + #endif // !NDEBUG + CHK_(ptr); + } else goto handle_unusual; + continue; default: { handle_unusual: if ((tag & 7) == 4 || tag == 0) { @@ -2416,6 +2441,16 @@ ::PROTOBUF_NAMESPACE_ID::uint8* PutRecord::_InternalSerialize( 4, this->_internal_data(), target); } + // optional string stream_arn = 5; + if (cached_has_bits & 0x00000010u) { + ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::VerifyUTF8StringNamedField( + this->_internal_stream_arn().data(), static_cast(this->_internal_stream_arn().length()), + ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SERIALIZE, + "aws.kinesis.protobuf.PutRecord.stream_arn"); + target = stream->WriteStringMaybeAliased( + 5, this->_internal_stream_arn(), target); + } + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::InternalSerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target, stream); @@ -2486,6 +2521,13 @@ size_t PutRecord::ByteSizeLong() const { this->_internal_explicit_hash_key()); } + // optional string stream_arn = 5; + if (cached_has_bits & 0x00000010u) { + total_size += 1 + + ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( + this->_internal_stream_arn()); + } + if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { return ::PROTOBUF_NAMESPACE_ID::internal::ComputeUnknownFieldsSize( _internal_metadata_, total_size, &_cached_size_); @@ -2518,7 +2560,7 @@ void PutRecord::MergeFrom(const PutRecord& from) { (void) cached_has_bits; cached_has_bits = from._has_bits_[0]; - if (cached_has_bits & 0x0000000fu) { + if (cached_has_bits & 0x0000001fu) { if (cached_has_bits & 0x00000001u) { _has_bits_[0] |= 0x00000001u; stream_name_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.stream_name_); @@ -2535,6 +2577,10 @@ void PutRecord::MergeFrom(const PutRecord& from) { _has_bits_[0] |= 0x00000008u; data_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.data_); } + if (cached_has_bits & 0x00000010u) { + _has_bits_[0] |= 0x00000010u; + stream_arn_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.stream_arn_); + } } } @@ -2569,6 +2615,8 @@ void PutRecord::InternalSwap(PutRecord* other) { GetArenaNoVirtual()); data_.Swap(&other->data_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), GetArenaNoVirtual()); + stream_arn_.Swap(&other->stream_arn_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), + GetArenaNoVirtual()); } ::PROTOBUF_NAMESPACE_ID::Metadata PutRecord::GetMetadata() const { diff --git a/aws/kinesis/protobuf/messages.pb.h b/aws/kinesis/protobuf/messages.pb.h index e1450507..6833c829 100644 --- a/aws/kinesis/protobuf/messages.pb.h +++ b/aws/kinesis/protobuf/messages.pb.h @@ -1114,6 +1114,7 @@ class PutRecord : kPartitionKeyFieldNumber = 2, kExplicitHashKeyFieldNumber = 3, kDataFieldNumber = 4, + kStreamArnFieldNumber = 5, }; // required string stream_name = 1; bool has_stream_name() const; @@ -1195,6 +1196,26 @@ class PutRecord : std::string* _internal_mutable_data(); public: + // optional string stream_arn = 5; + bool has_stream_arn() const; + private: + bool _internal_has_stream_arn() const; + public: + void clear_stream_arn(); + const std::string& stream_arn() const; + void set_stream_arn(const std::string& value); + void set_stream_arn(std::string&& value); + void set_stream_arn(const char* value); + void set_stream_arn(const char* value, size_t size); + std::string* mutable_stream_arn(); + std::string* release_stream_arn(); + void set_allocated_stream_arn(std::string* stream_arn); + private: + const std::string& _internal_stream_arn() const; + void _internal_set_stream_arn(const std::string& value); + std::string* _internal_mutable_stream_arn(); + public: + // @@protoc_insertion_point(class_scope:aws.kinesis.protobuf.PutRecord) private: class _Internal; @@ -1209,6 +1230,7 @@ class PutRecord : ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr partition_key_; ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr explicit_hash_key_; ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr data_; + ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr stream_arn_; friend struct ::TableStruct_messages_2eproto; }; // ------------------------------------------------------------------- @@ -4229,6 +4251,77 @@ inline void PutRecord::set_allocated_data(std::string* data) { // @@protoc_insertion_point(field_set_allocated:aws.kinesis.protobuf.PutRecord.data) } +// optional string stream_arn = 5; +inline bool PutRecord::_internal_has_stream_arn() const { + bool value = (_has_bits_[0] & 0x00000010u) != 0; + return value; +} +inline bool PutRecord::has_stream_arn() const { + return _internal_has_stream_arn(); +} +inline void PutRecord::clear_stream_arn() { + stream_arn_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); + _has_bits_[0] &= ~0x00000010u; +} +inline const std::string& PutRecord::stream_arn() const { + // @@protoc_insertion_point(field_get:aws.kinesis.protobuf.PutRecord.stream_arn) + return _internal_stream_arn(); +} +inline void PutRecord::set_stream_arn(const std::string& value) { + _internal_set_stream_arn(value); + // @@protoc_insertion_point(field_set:aws.kinesis.protobuf.PutRecord.stream_arn) +} +inline std::string* PutRecord::mutable_stream_arn() { + // @@protoc_insertion_point(field_mutable:aws.kinesis.protobuf.PutRecord.stream_arn) + return _internal_mutable_stream_arn(); +} +inline const std::string& PutRecord::_internal_stream_arn() const { + return stream_arn_.GetNoArena(); +} +inline void PutRecord::_internal_set_stream_arn(const std::string& value) { + _has_bits_[0] |= 0x00000010u; + stream_arn_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), value); +} +inline void PutRecord::set_stream_arn(std::string&& value) { + _has_bits_[0] |= 0x00000010u; + stream_arn_.SetNoArena( + &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::move(value)); + // @@protoc_insertion_point(field_set_rvalue:aws.kinesis.protobuf.PutRecord.stream_arn) +} +inline void PutRecord::set_stream_arn(const char* value) { + GOOGLE_DCHECK(value != nullptr); + _has_bits_[0] |= 0x00000010u; + stream_arn_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); + // @@protoc_insertion_point(field_set_char:aws.kinesis.protobuf.PutRecord.stream_arn) +} +inline void PutRecord::set_stream_arn(const char* value, size_t size) { + _has_bits_[0] |= 0x00000010u; + stream_arn_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), + ::std::string(reinterpret_cast(value), size)); + // @@protoc_insertion_point(field_set_pointer:aws.kinesis.protobuf.PutRecord.stream_arn) +} +inline std::string* PutRecord::_internal_mutable_stream_arn() { + _has_bits_[0] |= 0x00000010u; + return stream_arn_.MutableNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); +} +inline std::string* PutRecord::release_stream_arn() { + // @@protoc_insertion_point(field_release:aws.kinesis.protobuf.PutRecord.stream_arn) + if (!_internal_has_stream_arn()) { + return nullptr; + } + _has_bits_[0] &= ~0x00000010u; + return stream_arn_.ReleaseNonDefaultNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); +} +inline void PutRecord::set_allocated_stream_arn(std::string* stream_arn) { + if (stream_arn != nullptr) { + _has_bits_[0] |= 0x00000010u; + } else { + _has_bits_[0] &= ~0x00000010u; + } + stream_arn_.SetAllocatedNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), stream_arn); + // @@protoc_insertion_point(field_set_allocated:aws.kinesis.protobuf.PutRecord.stream_arn) +} + // ------------------------------------------------------------------- // Flush diff --git a/aws/kinesis/protobuf/messages.proto b/aws/kinesis/protobuf/messages.proto index 9af3de16..78781ed0 100644 --- a/aws/kinesis/protobuf/messages.proto +++ b/aws/kinesis/protobuf/messages.proto @@ -42,6 +42,7 @@ message PutRecord { required string partition_key = 2; optional string explicit_hash_key = 3; required bytes data = 4; + optional string stream_arn = 5; } message Flush { diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java index 085e6774..67732a2b 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java @@ -14,7 +14,11 @@ public interface IKinesisProducer { ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data); - ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema); + ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data, String streamARN); + + ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN); + + ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema); int getOutstandingRecordsCount(); diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java index aa88cc5d..8da111bc 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java @@ -408,7 +408,7 @@ protected KinesisProducer(File inPipe, File outPipe) { */ @Override public ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data) { - return addUserRecord(stream, partitionKey, null, data); + return addUserRecord(stream, partitionKey, null, data, null); } /** @@ -465,7 +465,7 @@ public ListenableFuture addUserRecord(String stream, String pa */ @Override public ListenableFuture addUserRecord(UserRecord userRecord) { - return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData(), userRecord.getSchema()); + return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData(), userRecord.getStreamARN(), userRecord.getSchema()); } /** @@ -532,11 +532,144 @@ public ListenableFuture addUserRecord(UserRecord userRecord) { */ @Override public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) { - return addUserRecord(stream, partitionKey, explicitHashKey, data, null); + return addUserRecord(stream, partitionKey, explicitHashKey, data, null, null); } + /** + * Put a record asynchronously. A {@link ListenableFuture} is returned that + * can be used to retrieve the result, either by polling or by registering a + * callback. + * + *

+ * The return value can be disregarded if you do not wish to process the + * result. Under the covers, the KPL will automatically reattempt puts in + * case of transient errors (including throttling). A failed result is + * generally returned only if an irrecoverable error is detected (e.g. + * trying to put to a stream that doesn't exist), or if the record expires. + * + *

+ * Thread safe. + * + *

+ * To add a listener to the future: + *

+ * + * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); + * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); + * + *

+ * where callback is an instance of + * {@link com.google.common.util.concurrent.FutureCallback} and + * executor is an instance of + * {@link java.util.concurrent.Executor}. + *

+ * Important: + *

+ * If long-running tasks are performed in the callbacks, it is recommended + * that a custom executor be provided when registering callbacks to ensure + * that there are enough threads to achieve the desired level of + * parallelism. By default, the KPL will use an internal thread pool to + * execute callbacks, but this pool may not have a sufficient number of + * threads if a large number is desired. + *

+ * Another option would be to hand the result off to a different component + * for processing and keep the callback routine fast. + * + * @param stream + * Stream to put to. + * @param partitionKey + * Partition key. Length must be at least one, and at most 256 + * (inclusive). + * @param data + * Binary data of the record. Maximum size 1MiB. + * @return A future for the result of the put. + * @param streamARN + * ARN of the stream, e.g., arn:aws:kinesis:us-east-2:123456789012:stream/mystream + * @throws IllegalArgumentException + * if input does not meet stated constraints + * @throws DaemonException + * if the child process is dead + * @see ListenableFuture + * @see UserRecordResult + * @see KinesisProducerConfiguration#setRecordTtl(long) + * @see UserRecordFailedException + */ + @Override + public ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data, String streamARN) { + return addUserRecord(stream, partitionKey, null, data, streamARN, null); + } + + /** + * Put a record asynchronously. A {@link ListenableFuture} is returned that + * can be used to retrieve the result, either by polling or by registering a + * callback. + * + *

+ * The return value can be disregarded if you do not wish to process the + * result. Under the covers, the KPL will automatically reattempt puts in + * case of transient errors (including throttling). A failed result is + * generally returned only if an irrecoverable error is detected (e.g. + * trying to put to a stream that doesn't exist), or if the record expires. + * + *

+ * Thread safe. + * + *

+ * To add a listener to the future: + *

+ * + * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); + * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); + * + *

+ * where callback is an instance of + * {@link com.google.common.util.concurrent.FutureCallback} and + * executor is an instance of + * {@link java.util.concurrent.Executor}. + *

+ * Important: + *

+ * If long-running tasks are performed in the callbacks, it is recommended + * that a custom executor be provided when registering callbacks to ensure + * that there are enough threads to achieve the desired level of + * parallelism. By default, the KPL will use an internal thread pool to + * execute callbacks, but this pool may not have a sufficient number of + * threads if a large number is desired. + *

+ * Another option would be to hand the result off to a different component + * for processing and keep the callback routine fast. + * + * @param stream + * Stream to put to. + * @param partitionKey + * Partition key. Length must be at least one, and at most 256 + * (inclusive). + * @param explicitHashKey + * The hash value used to explicitly determine the shard the data + * record is assigned to by overriding the partition key hash. + * Must be a valid string representation of a positive integer + * with value between 0 and 2^128 - 1 (inclusive). + * @param data + * Binary data of the record. Maximum size 1MiB. + * @return A future for the result of the put. + * @param streamARN + * ARN of the stream, e.g., arn:aws:kinesis:us-east-2:123456789012:stream/mystream + * @throws IllegalArgumentException + * if input does not meet stated constraints + * @throws DaemonException + * if the child process is dead + * @see ListenableFuture + * @see UserRecordResult + * @see KinesisProducerConfiguration#setRecordTtl(long) + * @see UserRecordFailedException + */ @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) { + public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN) { + return addUserRecord(stream, partitionKey, explicitHashKey, data, streamARN, null); + } + + @Override + public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema) { if (stream == null) { throw new IllegalArgumentException("Stream name cannot be null"); } @@ -617,6 +750,9 @@ public ListenableFuture addUserRecord(String stream, String pa if (b != null) { pr.setExplicitHashKey(b.toString(10)); } + if(streamARN != null) { + pr.setStreamArn(streamARN); + } Message m = Message.newBuilder() .setId(id) diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java index 642ed349..2e0cad67 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java @@ -48,6 +48,12 @@ public UserRecord(String streamName, String partitionKey, ByteBuffer data) { this.partitionKey = partitionKey; this.data = data; } + public UserRecord(String streamName, String partitionKey, ByteBuffer data, String streamARN) { + this.streamName = streamName; + this.partitionKey = partitionKey; + this.streamARN = streamARN; + this.data = data; + } public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data) { this.streamName = streamName; @@ -55,11 +61,19 @@ public UserRecord(String streamName, String partitionKey, String explicitHashKey this.explicitHashKey = explicitHashKey; this.data = data; } + public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN) { + this.streamName = streamName; + this.partitionKey = partitionKey; + this.explicitHashKey = explicitHashKey; + this.streamARN = streamARN; + this.data = data; + } - public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) { + public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema) { this.streamName = streamName; this.partitionKey = partitionKey; this.explicitHashKey = explicitHashKey; + this.streamARN= streamARN; this.data = data; this.schema = schema; } @@ -90,6 +104,17 @@ public UserRecord withPartitionKey(String partitionKey) { return this; } + public String getStreamARN() { return streamARN; } + + public void setStreamARN(String streamARN) { + this.streamARN = streamARN; + } + + public UserRecord withStreamARN(String streamARN) { + this.streamARN = streamARN; + return this; + } + public ByteBuffer getData() { return data; } diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java index c64651a4..fdf76281 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java @@ -1868,7 +1868,7 @@ public java.util.Listrepeated .aws.kinesis.protobuf.AdditionalDimension additional_metric_dims = 128; */ - public java.util.List + public java.util.List getAdditionalMetricDimsOrBuilderList() { return additionalMetricDims_; } @@ -1961,7 +1961,7 @@ public java.lang.String getCloudwatchEndpoint() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -1978,7 +1978,7 @@ public java.lang.String getCloudwatchEndpoint() { getCloudwatchEndpointBytes() { java.lang.Object ref = cloudwatchEndpoint_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); cloudwatchEndpoint_ = b; @@ -2108,7 +2108,7 @@ public java.lang.String getKinesisEndpoint() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2125,7 +2125,7 @@ public java.lang.String getKinesisEndpoint() { getKinesisEndpointBytes() { java.lang.Object ref = kinesisEndpoint_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); kinesisEndpoint_ = b; @@ -2170,7 +2170,7 @@ public java.lang.String getLogLevel() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2187,7 +2187,7 @@ public java.lang.String getLogLevel() { getLogLevelBytes() { java.lang.Object ref = logLevel_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); logLevel_ = b; @@ -2232,7 +2232,7 @@ public java.lang.String getMetricsGranularity() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2249,7 +2249,7 @@ public java.lang.String getMetricsGranularity() { getMetricsGranularityBytes() { java.lang.Object ref = metricsGranularity_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsGranularity_ = b; @@ -2277,7 +2277,7 @@ public java.lang.String getMetricsLevel() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2294,7 +2294,7 @@ public java.lang.String getMetricsLevel() { getMetricsLevelBytes() { java.lang.Object ref = metricsLevel_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsLevel_ = b; @@ -2322,7 +2322,7 @@ public java.lang.String getMetricsNamespace() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2339,7 +2339,7 @@ public java.lang.String getMetricsNamespace() { getMetricsNamespaceBytes() { java.lang.Object ref = metricsNamespace_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsNamespace_ = b; @@ -2452,7 +2452,7 @@ public java.lang.String getRegion() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2469,7 +2469,7 @@ public java.lang.String getRegion() { getRegionBytes() { java.lang.Object ref = region_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); region_ = b; @@ -2531,7 +2531,7 @@ public java.lang.String getProxyHost() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2548,7 +2548,7 @@ public java.lang.String getProxyHost() { getProxyHostBytes() { java.lang.Object ref = proxyHost_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyHost_ = b; @@ -2593,7 +2593,7 @@ public java.lang.String getProxyUserName() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2610,7 +2610,7 @@ public java.lang.String getProxyUserName() { getProxyUserNameBytes() { java.lang.Object ref = proxyUserName_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyUserName_ = b; @@ -2638,7 +2638,7 @@ public java.lang.String getProxyPassword() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2655,7 +2655,7 @@ public java.lang.String getProxyPassword() { getProxyPasswordBytes() { java.lang.Object ref = proxyPassword_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyPassword_ = b; @@ -2683,7 +2683,7 @@ public java.lang.String getStsEndpoint() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2700,7 +2700,7 @@ public java.lang.String getStsEndpoint() { getStsEndpointBytes() { java.lang.Object ref = stsEndpoint_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); stsEndpoint_ = b; @@ -3812,7 +3812,7 @@ public Builder mergeFrom(com.amazonaws.services.kinesis.producer.protobuf.Config additionalMetricDimsBuilder_ = null; additionalMetricDims_ = other.additionalMetricDims_; bitField0_ = (bitField0_ & ~0x00000001); - additionalMetricDimsBuilder_ = + additionalMetricDimsBuilder_ = com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders ? getAdditionalMetricDimsFieldBuilder() : null; } else { @@ -4172,7 +4172,7 @@ public com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensi /** * repeated .aws.kinesis.protobuf.AdditionalDimension additional_metric_dims = 128; */ - public java.util.List + public java.util.List getAdditionalMetricDimsOrBuilderList() { if (additionalMetricDimsBuilder_ != null) { return additionalMetricDimsBuilder_.getMessageOrBuilderList(); @@ -4198,12 +4198,12 @@ public com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensi /** * repeated .aws.kinesis.protobuf.AdditionalDimension additional_metric_dims = 128; */ - public java.util.List + public java.util.List getAdditionalMetricDimsBuilderList() { return getAdditionalMetricDimsFieldBuilder().getBuilderList(); } private com.google.protobuf.RepeatedFieldBuilderV3< - com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension.Builder, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensionOrBuilder> + com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension.Builder, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensionOrBuilder> getAdditionalMetricDimsFieldBuilder() { if (additionalMetricDimsBuilder_ == null) { additionalMetricDimsBuilder_ = new com.google.protobuf.RepeatedFieldBuilderV3< @@ -4362,7 +4362,7 @@ public java.lang.String getCloudwatchEndpoint() { getCloudwatchEndpointBytes() { java.lang.Object ref = cloudwatchEndpoint_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); cloudwatchEndpoint_ = b; @@ -4668,7 +4668,7 @@ public java.lang.String getKinesisEndpoint() { getKinesisEndpointBytes() { java.lang.Object ref = kinesisEndpoint_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); kinesisEndpoint_ = b; @@ -4789,7 +4789,7 @@ public java.lang.String getLogLevel() { getLogLevelBytes() { java.lang.Object ref = logLevel_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); logLevel_ = b; @@ -4910,7 +4910,7 @@ public java.lang.String getMetricsGranularity() { getMetricsGranularityBytes() { java.lang.Object ref = metricsGranularity_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsGranularity_ = b; @@ -4994,7 +4994,7 @@ public java.lang.String getMetricsLevel() { getMetricsLevelBytes() { java.lang.Object ref = metricsLevel_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsLevel_ = b; @@ -5078,7 +5078,7 @@ public java.lang.String getMetricsNamespace() { getMetricsNamespaceBytes() { java.lang.Object ref = metricsNamespace_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsNamespace_ = b; @@ -5347,7 +5347,7 @@ public java.lang.String getRegion() { getRegionBytes() { java.lang.Object ref = region_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); region_ = b; @@ -5505,7 +5505,7 @@ public java.lang.String getProxyHost() { getProxyHostBytes() { java.lang.Object ref = proxyHost_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyHost_ = b; @@ -5626,7 +5626,7 @@ public java.lang.String getProxyUserName() { getProxyUserNameBytes() { java.lang.Object ref = proxyUserName_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyUserName_ = b; @@ -5710,7 +5710,7 @@ public java.lang.String getProxyPassword() { getProxyPasswordBytes() { java.lang.Object ref = proxyPassword_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyPassword_ = b; @@ -5794,7 +5794,7 @@ public java.lang.String getStsEndpoint() { getStsEndpointBytes() { java.lang.Object ref = stsEndpoint_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); stsEndpoint_ = b; @@ -6014,12 +6014,12 @@ public com.amazonaws.services.kinesis.producer.protobuf.Config.Configuration get private static final com.google.protobuf.Descriptors.Descriptor internal_static_aws_kinesis_protobuf_AdditionalDimension_descriptor; - private static final + private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_aws_kinesis_protobuf_AdditionalDimension_fieldAccessorTable; private static final com.google.protobuf.Descriptors.Descriptor internal_static_aws_kinesis_protobuf_Configuration_descriptor; - private static final + private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_aws_kinesis_protobuf_Configuration_fieldAccessorTable; diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java index a50e361d..60b0aa5e 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java @@ -5516,6 +5516,23 @@ public interface PutRecordOrBuilder extends * @return The data. */ com.google.protobuf.ByteString getData(); + + /** + * optional string stream_arn = 5; + * @return Whether the streamArn field is set. + */ + boolean hasStreamArn(); + /** + * optional string stream_arn = 5; + * @return The streamArn. + */ + java.lang.String getStreamArn(); + /** + * optional string stream_arn = 5; + * @return The bytes for streamArn. + */ + com.google.protobuf.ByteString + getStreamArnBytes(); } /** * Protobuf type {@code aws.kinesis.protobuf.PutRecord} @@ -5534,6 +5551,7 @@ private PutRecord() { partitionKey_ = ""; explicitHashKey_ = ""; data_ = com.google.protobuf.ByteString.EMPTY; + streamArn_ = ""; } @java.lang.Override @@ -5590,6 +5608,12 @@ private PutRecord( data_ = input.readBytes(); break; } + case 42: { + com.google.protobuf.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000010; + streamArn_ = bs; + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -5775,6 +5799,51 @@ public com.google.protobuf.ByteString getData() { return data_; } + public static final int STREAM_ARN_FIELD_NUMBER = 5; + private volatile java.lang.Object streamArn_; + /** + * optional string stream_arn = 5; + * @return Whether the streamArn field is set. + */ + public boolean hasStreamArn() { + return ((bitField0_ & 0x00000010) != 0); + } + /** + * optional string stream_arn = 5; + * @return The streamArn. + */ + public java.lang.String getStreamArn() { + java.lang.Object ref = streamArn_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + streamArn_ = s; + } + return s; + } + } + /** + * optional string stream_arn = 5; + * @return The bytes for streamArn. + */ + public com.google.protobuf.ByteString + getStreamArnBytes() { + java.lang.Object ref = streamArn_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + streamArn_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -5813,6 +5882,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000008) != 0)) { output.writeBytes(4, data_); } + if (((bitField0_ & 0x00000010) != 0)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 5, streamArn_); + } unknownFields.writeTo(output); } @@ -5835,6 +5907,9 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, data_); } + if (((bitField0_ & 0x00000010) != 0)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(5, streamArn_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -5870,6 +5945,11 @@ public boolean equals(final java.lang.Object obj) { if (!getData() .equals(other.getData())) return false; } + if (hasStreamArn() != other.hasStreamArn()) return false; + if (hasStreamArn()) { + if (!getStreamArn() + .equals(other.getStreamArn())) return false; + } if (!unknownFields.equals(other.unknownFields)) return false; return true; } @@ -5897,6 +5977,10 @@ public int hashCode() { hash = (37 * hash) + DATA_FIELD_NUMBER; hash = (53 * hash) + getData().hashCode(); } + if (hasStreamArn()) { + hash = (37 * hash) + STREAM_ARN_FIELD_NUMBER; + hash = (53 * hash) + getStreamArn().hashCode(); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -6038,6 +6122,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000004); data_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000008); + streamArn_ = ""; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -6082,6 +6168,10 @@ public com.amazonaws.services.kinesis.producer.protobuf.Messages.PutRecord build to_bitField0_ |= 0x00000008; } result.data_ = data_; + if (((from_bitField0_ & 0x00000010) != 0)) { + to_bitField0_ |= 0x00000010; + } + result.streamArn_ = streamArn_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6149,6 +6239,11 @@ public Builder mergeFrom(com.amazonaws.services.kinesis.producer.protobuf.Messag if (other.hasData()) { setData(other.getData()); } + if (other.hasStreamArn()) { + bitField0_ |= 0x00000010; + streamArn_ = other.streamArn_; + onChanged(); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -6479,6 +6574,90 @@ public Builder clearData() { onChanged(); return this; } + + private java.lang.Object streamArn_ = ""; + /** + * optional string stream_arn = 5; + * @return Whether the streamArn field is set. + */ + public boolean hasStreamArn() { + return ((bitField0_ & 0x00000010) != 0); + } + /** + * optional string stream_arn = 5; + * @return The streamArn. + */ + public java.lang.String getStreamArn() { + java.lang.Object ref = streamArn_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + streamArn_ = s; + } + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string stream_arn = 5; + * @return The bytes for streamArn. + */ + public com.google.protobuf.ByteString + getStreamArnBytes() { + java.lang.Object ref = streamArn_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + streamArn_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string stream_arn = 5; + * @param value The streamArn to set. + * @return This builder for chaining. + */ + public Builder setStreamArn( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + streamArn_ = value; + onChanged(); + return this; + } + /** + * optional string stream_arn = 5; + * @return This builder for chaining. + */ + public Builder clearStreamArn() { + bitField0_ = (bitField0_ & ~0x00000010); + streamArn_ = getDefaultInstance().getStreamArn(); + onChanged(); + return this; + } + /** + * optional string stream_arn = 5; + * @param value The bytes for streamArn to set. + * @return This builder for chaining. + */ + public Builder setStreamArnBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + streamArn_ = value; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -15853,31 +16032,31 @@ public com.amazonaws.services.kinesis.producer.protobuf.Messages.MetricsResponse "H\000\022A\n\020metrics_response\030\010 \001(\0132%.aws.kines" + "is.protobuf.MetricsResponseH\000\022?\n\017set_cre" + "dentials\030\t \001(\0132$.aws.kinesis.protobuf.Se" + - "tCredentialsH\000B\020\n\016actual_message\"`\n\tPutR" + + "tCredentialsH\000B\020\n\016actual_message\"t\n\tPutR" + "ecord\022\023\n\013stream_name\030\001 \002(\t\022\025\n\rpartition_" + "key\030\002 \002(\t\022\031\n\021explicit_hash_key\030\003 \001(\t\022\014\n\004" + - "data\030\004 \002(\014\"\034\n\005Flush\022\023\n\013stream_name\030\001 \001(\t" + - "\"f\n\007Attempt\022\r\n\005delay\030\001 \002(\r\022\020\n\010duration\030\002" + - " \002(\r\022\017\n\007success\030\003 \002(\010\022\022\n\nerror_code\030\004 \001(" + - "\t\022\025\n\rerror_message\030\005 \001(\t\"~\n\017PutRecordRes" + - "ult\022/\n\010attempts\030\001 \003(\0132\035.aws.kinesis.prot" + - "obuf.Attempt\022\017\n\007success\030\002 \002(\010\022\020\n\010shard_i" + - "d\030\003 \001(\t\022\027\n\017sequence_number\030\004 \001(\t\">\n\013Cred" + - "entials\022\014\n\004akid\030\001 \002(\t\022\022\n\nsecret_key\030\002 \002(" + - "\t\022\r\n\005token\030\003 \001(\t\"]\n\016SetCredentials\022\023\n\013fo" + - "r_metrics\030\001 \001(\010\0226\n\013credentials\030\002 \002(\0132!.a" + - "ws.kinesis.protobuf.Credentials\"\'\n\tDimen" + - "sion\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"K\n\005Stat" + - "s\022\r\n\005count\030\001 \002(\001\022\013\n\003sum\030\002 \002(\001\022\014\n\004mean\030\003 " + - "\002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003max\030\005 \002(\001\"\210\001\n\006Metric" + - "\022\014\n\004name\030\001 \002(\t\0223\n\ndimensions\030\002 \003(\0132\037.aws" + - ".kinesis.protobuf.Dimension\022*\n\005stats\030\003 \002" + - "(\0132\033.aws.kinesis.protobuf.Stats\022\017\n\007secon" + - "ds\030\004 \002(\004\"/\n\016MetricsRequest\022\014\n\004name\030\001 \001(\t" + - "\022\017\n\007seconds\030\002 \001(\004\"@\n\017MetricsResponse\022-\n\007" + - "metrics\030\001 \003(\0132\034.aws.kinesis.protobuf.Met" + - "ricB2\n0com.amazonaws.services.kinesis.pr" + - "oducer.protobuf" + "data\030\004 \002(\014\022\022\n\nstream_arn\030\005 \001(\t\"\034\n\005Flush\022" + + "\023\n\013stream_name\030\001 \001(\t\"f\n\007Attempt\022\r\n\005delay" + + "\030\001 \002(\r\022\020\n\010duration\030\002 \002(\r\022\017\n\007success\030\003 \002(" + + "\010\022\022\n\nerror_code\030\004 \001(\t\022\025\n\rerror_message\030\005" + + " \001(\t\"~\n\017PutRecordResult\022/\n\010attempts\030\001 \003(" + + "\0132\035.aws.kinesis.protobuf.Attempt\022\017\n\007succ" + + "ess\030\002 \002(\010\022\020\n\010shard_id\030\003 \001(\t\022\027\n\017sequence_" + + "number\030\004 \001(\t\">\n\013Credentials\022\014\n\004akid\030\001 \002(" + + "\t\022\022\n\nsecret_key\030\002 \002(\t\022\r\n\005token\030\003 \001(\t\"]\n\016" + + "SetCredentials\022\023\n\013for_metrics\030\001 \001(\010\0226\n\013c" + + "redentials\030\002 \002(\0132!.aws.kinesis.protobuf." + + "Credentials\"\'\n\tDimension\022\013\n\003key\030\001 \002(\t\022\r\n" + + "\005value\030\002 \002(\t\"K\n\005Stats\022\r\n\005count\030\001 \002(\001\022\013\n\003" + + "sum\030\002 \002(\001\022\014\n\004mean\030\003 \002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003" + + "max\030\005 \002(\001\"\210\001\n\006Metric\022\014\n\004name\030\001 \002(\t\0223\n\ndi" + + "mensions\030\002 \003(\0132\037.aws.kinesis.protobuf.Di" + + "mension\022*\n\005stats\030\003 \002(\0132\033.aws.kinesis.pro" + + "tobuf.Stats\022\017\n\007seconds\030\004 \002(\004\"/\n\016MetricsR" + + "equest\022\014\n\004name\030\001 \001(\t\022\017\n\007seconds\030\002 \001(\004\"@\n" + + "\017MetricsResponse\022-\n\007metrics\030\001 \003(\0132\034.aws." + + "kinesis.protobuf.MetricB2\n0com.amazonaws" + + ".services.kinesis.producer.protobuf" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -15913,7 +16092,7 @@ public com.amazonaws.services.kinesis.producer.protobuf.Messages.MetricsResponse internal_static_aws_kinesis_protobuf_PutRecord_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_aws_kinesis_protobuf_PutRecord_descriptor, - new java.lang.String[] { "StreamName", "PartitionKey", "ExplicitHashKey", "Data", }); + new java.lang.String[] { "StreamName", "PartitionKey", "ExplicitHashKey", "Data", "StreamArn", }); internal_static_aws_kinesis_protobuf_Flush_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_aws_kinesis_protobuf_Flush_fieldAccessorTable = new diff --git a/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java b/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java index 8f4bd8e3..b446bdca 100644 --- a/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java +++ b/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java @@ -138,7 +138,7 @@ public void differentCredsForRecordsAndMetrics() throws InterruptedException, Ex final long start = System.nanoTime(); while (System.nanoTime() - start < 500 * 1000000) { - kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0])); + kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0]), "streamARN"); kp.flush(); Thread.sleep(10); } @@ -187,7 +187,7 @@ public void rotatingCredentials() throws InterruptedException, ExecutionExceptio final long start = System.nanoTime(); while (System.nanoTime() - start < 500 * 1000000) { - kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0])); + kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0]), "streamARN"); kp.flush(); Thread.sleep(10); } @@ -253,9 +253,10 @@ public void schemaIntegration_OnInvalidSchema_ThrowsException() { String partitionKey = "partitionKey"; String hashKey = null; ByteBuffer data = ByteBuffer.wrap(new byte[] { 01, 23, 54 }); + String streamARN = "streamARN"; String schemaDefinition = null; Schema schema = new Schema(schemaDefinition, DataFormat.AVRO.toString(), "testSchema"); - UserRecord userRecord = new UserRecord(stream, partitionKey, hashKey, data, schema); + UserRecord userRecord = new UserRecord(stream, partitionKey, hashKey, data, streamARN, schema); try { kinesisProducer.addUserRecord(userRecord);