From 582e4b60384f9936ccd202b2117b70febea7fbea Mon Sep 17 00:00:00 2001 From: Abhishek Yadav Akkem Date: Wed, 13 Dec 2023 21:34:30 +0000 Subject: [PATCH] Revert "Add streamARN parameter to support CAA" This reverts commit 7cf965645b62e44691879929d2974418b4b3d1e5. --- aws/kinesis/core/kinesis_producer.cc | 9 +- aws/kinesis/core/kinesis_producer.h | 12 +- aws/kinesis/core/pipeline.h | 9 +- aws/kinesis/core/user_record.cc | 4 - aws/kinesis/core/user_record.h | 10 - aws/kinesis/protobuf/messages.pb.cc | 122 +++------- aws/kinesis/protobuf/messages.pb.h | 93 ------- aws/kinesis/protobuf/messages.proto | 1 - .../kinesis/producer/IKinesisProducer.java | 6 +- .../kinesis/producer/KinesisProducer.java | 144 +---------- .../services/kinesis/producer/UserRecord.java | 27 +-- .../kinesis/producer/protobuf/Config.java | 80 +++--- .../kinesis/producer/protobuf/Messages.java | 227 ++---------------- .../kinesis/producer/KinesisProducerTest.java | 7 +- 14 files changed, 117 insertions(+), 634 deletions(-) diff --git a/aws/kinesis/core/kinesis_producer.cc b/aws/kinesis/core/kinesis_producer.cc index 75447000..3ac7e3e6 100644 --- a/aws/kinesis/core/kinesis_producer.cc +++ b/aws/kinesis/core/kinesis_producer.cc @@ -222,12 +222,11 @@ void KinesisProducer::create_sts_client(const std::string& ca_path) { cfg); } -Pipeline* KinesisProducer::create_pipeline(const std::string& stream, const boost::optional& stream_arn) { +Pipeline* KinesisProducer::create_pipeline(const std::string& stream) { LOG(info) << "Created pipeline for stream \"" << stream << "\""; return new Pipeline( region_, stream, - stream_arn, config_, executor_, kinesis_client_, @@ -292,11 +291,7 @@ void KinesisProducer::on_put_record(aws::kinesis::protobuf::Message& m) { std::chrono::milliseconds(config_->record_max_buffered_time())); ur->set_expiration_from_now( std::chrono::milliseconds(config_->record_ttl())); - if (ur->stream_arn()) { - pipelines_[ur->stream_arn().get()].put(ur); - } else { - pipelines_[ur->stream()].put(ur); - } + pipelines_[ur->stream()].put(ur); } void KinesisProducer::on_flush(const aws::kinesis::protobuf::Flush& flush_msg) { diff --git a/aws/kinesis/core/kinesis_producer.h b/aws/kinesis/core/kinesis_producer.h index b56715f2..98bf4f5e 100644 --- a/aws/kinesis/core/kinesis_producer.h +++ b/aws/kinesis/core/kinesis_producer.h @@ -46,14 +46,8 @@ class KinesisProducer : boost::noncopyable { cw_creds_provider_(std::move(cw_creds_provider)), executor_(std::move(executor)), ipc_manager_(std::move(ipc_manager)), - pipelines_([this](auto& stream_or_arn) { - std::regex kinesisStreamArnRegex("^arn:aws:kinesis:[a-zA-Z0-9-]+:[0-9]+:stream/[a-zA-Z0-9_.-]+$"); - std::smatch match; - if (std::regex_search(stream_or_arn, match, kinesisStreamArnRegex)) { - return this->create_pipeline(match[1].str(), stream_or_arn); - } else { - return this->create_pipeline(stream_or_arn, boost::none); - } + pipelines_([this](auto& stream) { + return this->create_pipeline(stream); }), shutdown_(false) { create_kinesis_client(ca_path); @@ -86,7 +80,7 @@ class KinesisProducer : boost::noncopyable { void create_sts_client(const std::string& ca_path); - Pipeline* create_pipeline(const std::string& stream, const boost::optional& stream_arn); + Pipeline* create_pipeline(const std::string& stream); void drain_messages(); diff --git a/aws/kinesis/core/pipeline.h b/aws/kinesis/core/pipeline.h index fd7a0770..f4f3fe1e 100644 --- a/aws/kinesis/core/pipeline.h +++ b/aws/kinesis/core/pipeline.h @@ -50,7 +50,6 @@ class Pipeline : boost::noncopyable { Pipeline( std::string region, std::string stream, - boost::optional stream_arn, std::shared_ptr config, std::shared_ptr executor, std::shared_ptr kinesis_client, @@ -59,7 +58,7 @@ class Pipeline : boost::noncopyable { Retrier::UserRecordCallback finish_user_record_cb) : stream_(std::move(stream)), region_(std::move(region)), - stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_, stream_arn_))), + stream_arn_(std::move(init_stream_arn(sts_client, region_, stream_))), config_(std::move(config)), stats_logger_(stream_, config_->record_max_buffered_time()), executor_(std::move(executor)), @@ -206,11 +205,7 @@ class Pipeline : boost::noncopyable { // Retrieve the account ID and partition from the STS service. static std::string init_stream_arn(const std::shared_ptr& sts_client, const std::string ®ion, - const std::string &stream_name, - const boost::optional &stream_arn_) { - if (!stream_arn_) { - return stream_arn_.get(); - } + const std::string &stream_name) { Aws::STS::Model::GetCallerIdentityRequest request; auto outcome = sts_client->GetCallerIdentity(request); if (outcome.IsSuccess()) { diff --git a/aws/kinesis/core/user_record.cc b/aws/kinesis/core/user_record.cc index f72c5456..ce596414 100644 --- a/aws/kinesis/core/user_record.cc +++ b/aws/kinesis/core/user_record.cc @@ -31,10 +31,6 @@ UserRecord::UserRecord(aws::kinesis::protobuf::Message& m) source_id_ = m.id(); auto put_record = m.put_record(); stream_ = std::move(put_record.stream_name()); - has_stream_arn_ = put_record.has_stream_arn(); - if (has_stream_arn_) { - stream_arn_ = std::move(put_record.stream_arn()); - } partition_key_ = std::move(put_record.partition_key()); data_ = std::move(put_record.data()); has_explicit_hash_key_ = put_record.has_explicit_hash_key(); diff --git a/aws/kinesis/core/user_record.h b/aws/kinesis/core/user_record.h index 9b4d8e93..f304bc92 100644 --- a/aws/kinesis/core/user_record.h +++ b/aws/kinesis/core/user_record.h @@ -84,14 +84,6 @@ class UserRecord : public aws::utils::TimeSensitive { return ss.str(); } - boost::optional stream_arn() const noexcept { - if (has_stream_arn_) { - return stream_arn_; - } else { - return boost::none; - } - } - boost::optional explicit_hash_key() const noexcept { if (has_explicit_hash_key_) { return hash_key_decimal_str(); @@ -107,14 +99,12 @@ class UserRecord : public aws::utils::TimeSensitive { private: uint64_t source_id_; std::string stream_; - std::string stream_arn_; std::string partition_key_; uint128_t hash_key_; std::string data_; std::vector attempts_; boost::optional predicted_shard_; bool has_explicit_hash_key_; - bool has_stream_arn_; bool finished_; }; diff --git a/aws/kinesis/protobuf/messages.pb.cc b/aws/kinesis/protobuf/messages.pb.cc index 4389c024..856d7c63 100644 --- a/aws/kinesis/protobuf/messages.pb.cc +++ b/aws/kinesis/protobuf/messages.pb.cc @@ -396,12 +396,10 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_messages_2eproto::offsets[] PR PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, partition_key_), PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, explicit_hash_key_), PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, data_), - PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::PutRecord, stream_arn_), 0, 1, 2, 3, - 4, PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::Flush, _has_bits_), PROTOBUF_FIELD_OFFSET(::aws::kinesis::protobuf::Flush, _internal_metadata_), ~0u, // no _extensions_ @@ -516,17 +514,17 @@ static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOB { 9, 18, sizeof(::aws::kinesis::protobuf::Record)}, { 22, 30, sizeof(::aws::kinesis::protobuf::AggregatedRecord)}, { 33, 48, sizeof(::aws::kinesis::protobuf::Message)}, - { 57, 67, sizeof(::aws::kinesis::protobuf::PutRecord)}, - { 72, 78, sizeof(::aws::kinesis::protobuf::Flush)}, - { 79, 89, sizeof(::aws::kinesis::protobuf::Attempt)}, - { 94, 103, sizeof(::aws::kinesis::protobuf::PutRecordResult)}, - { 107, 115, sizeof(::aws::kinesis::protobuf::Credentials)}, - { 118, 125, sizeof(::aws::kinesis::protobuf::SetCredentials)}, - { 127, 134, sizeof(::aws::kinesis::protobuf::Dimension)}, - { 136, 146, sizeof(::aws::kinesis::protobuf::Stats)}, - { 151, 160, sizeof(::aws::kinesis::protobuf::Metric)}, - { 164, 171, sizeof(::aws::kinesis::protobuf::MetricsRequest)}, - { 173, 179, sizeof(::aws::kinesis::protobuf::MetricsResponse)}, + { 57, 66, sizeof(::aws::kinesis::protobuf::PutRecord)}, + { 70, 76, sizeof(::aws::kinesis::protobuf::Flush)}, + { 77, 87, sizeof(::aws::kinesis::protobuf::Attempt)}, + { 92, 101, sizeof(::aws::kinesis::protobuf::PutRecordResult)}, + { 105, 113, sizeof(::aws::kinesis::protobuf::Credentials)}, + { 116, 123, sizeof(::aws::kinesis::protobuf::SetCredentials)}, + { 125, 132, sizeof(::aws::kinesis::protobuf::Dimension)}, + { 134, 144, sizeof(::aws::kinesis::protobuf::Stats)}, + { 149, 158, sizeof(::aws::kinesis::protobuf::Metric)}, + { 162, 169, sizeof(::aws::kinesis::protobuf::MetricsRequest)}, + { 171, 177, sizeof(::aws::kinesis::protobuf::MetricsResponse)}, }; static ::PROTOBUF_NAMESPACE_ID::Message const * const file_default_instances[] = { @@ -568,31 +566,31 @@ const char descriptor_table_protodef_messages_2eproto[] PROTOBUF_SECTION_VARIABL "H\000\022A\n\020metrics_response\030\010 \001(\0132%.aws.kines" "is.protobuf.MetricsResponseH\000\022\?\n\017set_cre" "dentials\030\t \001(\0132$.aws.kinesis.protobuf.Se" - "tCredentialsH\000B\020\n\016actual_message\"t\n\tPutR" + "tCredentialsH\000B\020\n\016actual_message\"`\n\tPutR" "ecord\022\023\n\013stream_name\030\001 \002(\t\022\025\n\rpartition_" "key\030\002 \002(\t\022\031\n\021explicit_hash_key\030\003 \001(\t\022\014\n\004" - "data\030\004 \002(\014\022\022\n\nstream_arn\030\005 \001(\t\"\034\n\005Flush\022" - "\023\n\013stream_name\030\001 \001(\t\"f\n\007Attempt\022\r\n\005delay" - "\030\001 \002(\r\022\020\n\010duration\030\002 \002(\r\022\017\n\007success\030\003 \002(" - "\010\022\022\n\nerror_code\030\004 \001(\t\022\025\n\rerror_message\030\005" - " \001(\t\"~\n\017PutRecordResult\022/\n\010attempts\030\001 \003(" - "\0132\035.aws.kinesis.protobuf.Attempt\022\017\n\007succ" - "ess\030\002 \002(\010\022\020\n\010shard_id\030\003 \001(\t\022\027\n\017sequence_" - "number\030\004 \001(\t\">\n\013Credentials\022\014\n\004akid\030\001 \002(" - "\t\022\022\n\nsecret_key\030\002 \002(\t\022\r\n\005token\030\003 \001(\t\"]\n\016" - "SetCredentials\022\023\n\013for_metrics\030\001 \001(\010\0226\n\013c" - "redentials\030\002 \002(\0132!.aws.kinesis.protobuf." - "Credentials\"\'\n\tDimension\022\013\n\003key\030\001 \002(\t\022\r\n" - "\005value\030\002 \002(\t\"K\n\005Stats\022\r\n\005count\030\001 \002(\001\022\013\n\003" - "sum\030\002 \002(\001\022\014\n\004mean\030\003 \002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003" - "max\030\005 \002(\001\"\210\001\n\006Metric\022\014\n\004name\030\001 \002(\t\0223\n\ndi" - "mensions\030\002 \003(\0132\037.aws.kinesis.protobuf.Di" - "mension\022*\n\005stats\030\003 \002(\0132\033.aws.kinesis.pro" - "tobuf.Stats\022\017\n\007seconds\030\004 \002(\004\"/\n\016MetricsR" - "equest\022\014\n\004name\030\001 \001(\t\022\017\n\007seconds\030\002 \001(\004\"@\n" - "\017MetricsResponse\022-\n\007metrics\030\001 \003(\0132\034.aws." - "kinesis.protobuf.MetricB2\n0com.amazonaws" - ".services.kinesis.producer.protobuf" + "data\030\004 \002(\014\"\034\n\005Flush\022\023\n\013stream_name\030\001 \001(\t" + "\"f\n\007Attempt\022\r\n\005delay\030\001 \002(\r\022\020\n\010duration\030\002" + " \002(\r\022\017\n\007success\030\003 \002(\010\022\022\n\nerror_code\030\004 \001(" + "\t\022\025\n\rerror_message\030\005 \001(\t\"~\n\017PutRecordRes" + "ult\022/\n\010attempts\030\001 \003(\0132\035.aws.kinesis.prot" + "obuf.Attempt\022\017\n\007success\030\002 \002(\010\022\020\n\010shard_i" + "d\030\003 \001(\t\022\027\n\017sequence_number\030\004 \001(\t\">\n\013Cred" + "entials\022\014\n\004akid\030\001 \002(\t\022\022\n\nsecret_key\030\002 \002(" + "\t\022\r\n\005token\030\003 \001(\t\"]\n\016SetCredentials\022\023\n\013fo" + "r_metrics\030\001 \001(\010\0226\n\013credentials\030\002 \002(\0132!.a" + "ws.kinesis.protobuf.Credentials\"\'\n\tDimen" + "sion\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"K\n\005Stat" + "s\022\r\n\005count\030\001 \002(\001\022\013\n\003sum\030\002 \002(\001\022\014\n\004mean\030\003 " + "\002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003max\030\005 \002(\001\"\210\001\n\006Metric" + "\022\014\n\004name\030\001 \002(\t\0223\n\ndimensions\030\002 \003(\0132\037.aws" + ".kinesis.protobuf.Dimension\022*\n\005stats\030\003 \002" + "(\0132\033.aws.kinesis.protobuf.Stats\022\017\n\007secon" + "ds\030\004 \002(\004\"/\n\016MetricsRequest\022\014\n\004name\030\001 \001(\t" + "\022\017\n\007seconds\030\002 \001(\004\"@\n\017MetricsResponse\022-\n\007" + "metrics\030\001 \003(\0132\034.aws.kinesis.protobuf.Met" + "ricB2\n0com.amazonaws.services.kinesis.pr" + "oducer.protobuf" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_messages_2eproto_deps[1] = { &::descriptor_table_config_2eproto, @@ -617,7 +615,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_mes static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_messages_2eproto_once; static bool descriptor_table_messages_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_messages_2eproto = { - &descriptor_table_messages_2eproto_initialized, descriptor_table_protodef_messages_2eproto, "messages.proto", 1795, + &descriptor_table_messages_2eproto_initialized, descriptor_table_protodef_messages_2eproto, "messages.proto", 1775, &descriptor_table_messages_2eproto_once, descriptor_table_messages_2eproto_sccs, descriptor_table_messages_2eproto_deps, 15, 1, schemas, file_default_instances, TableStruct_messages_2eproto::offsets, file_level_metadata_messages_2eproto, 15, file_level_enum_descriptors_messages_2eproto, file_level_service_descriptors_messages_2eproto, @@ -2220,9 +2218,6 @@ class PutRecord::_Internal { static void set_has_data(HasBits* has_bits) { (*has_bits)[0] |= 8u; } - static void set_has_stream_arn(HasBits* has_bits) { - (*has_bits)[0] |= 16u; - } }; PutRecord::PutRecord() @@ -2251,10 +2246,6 @@ PutRecord::PutRecord(const PutRecord& from) if (from._internal_has_data()) { data_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.data_); } - stream_arn_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); - if (from._internal_has_stream_arn()) { - stream_arn_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.stream_arn_); - } // @@protoc_insertion_point(copy_constructor:aws.kinesis.protobuf.PutRecord) } @@ -2264,7 +2255,6 @@ void PutRecord::SharedCtor() { partition_key_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); explicit_hash_key_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); data_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); - stream_arn_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } PutRecord::~PutRecord() { @@ -2277,7 +2267,6 @@ void PutRecord::SharedDtor() { partition_key_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); explicit_hash_key_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); data_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); - stream_arn_.DestroyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); } void PutRecord::SetCachedSize(int size) const { @@ -2296,7 +2285,7 @@ void PutRecord::Clear() { (void) cached_has_bits; cached_has_bits = _has_bits_[0]; - if (cached_has_bits & 0x0000001fu) { + if (cached_has_bits & 0x0000000fu) { if (cached_has_bits & 0x00000001u) { stream_name_.ClearNonDefaultToEmptyNoArena(); } @@ -2309,9 +2298,6 @@ void PutRecord::Clear() { if (cached_has_bits & 0x00000008u) { data_.ClearNonDefaultToEmptyNoArena(); } - if (cached_has_bits & 0x00000010u) { - stream_arn_.ClearNonDefaultToEmptyNoArena(); - } } _has_bits_.Clear(); _internal_metadata_.Clear(); @@ -2366,17 +2352,6 @@ const char* PutRecord::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID:: CHK_(ptr); } else goto handle_unusual; continue; - // optional string stream_arn = 5; - case 5: - if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 42)) { - auto str = _internal_mutable_stream_arn(); - ptr = ::PROTOBUF_NAMESPACE_ID::internal::InlineGreedyStringParser(str, ptr, ctx); - #ifndef NDEBUG - ::PROTOBUF_NAMESPACE_ID::internal::VerifyUTF8(str, "aws.kinesis.protobuf.PutRecord.stream_arn"); - #endif // !NDEBUG - CHK_(ptr); - } else goto handle_unusual; - continue; default: { handle_unusual: if ((tag & 7) == 4 || tag == 0) { @@ -2441,16 +2416,6 @@ ::PROTOBUF_NAMESPACE_ID::uint8* PutRecord::_InternalSerialize( 4, this->_internal_data(), target); } - // optional string stream_arn = 5; - if (cached_has_bits & 0x00000010u) { - ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::VerifyUTF8StringNamedField( - this->_internal_stream_arn().data(), static_cast(this->_internal_stream_arn().length()), - ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SERIALIZE, - "aws.kinesis.protobuf.PutRecord.stream_arn"); - target = stream->WriteStringMaybeAliased( - 5, this->_internal_stream_arn(), target); - } - if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::InternalSerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target, stream); @@ -2521,13 +2486,6 @@ size_t PutRecord::ByteSizeLong() const { this->_internal_explicit_hash_key()); } - // optional string stream_arn = 5; - if (cached_has_bits & 0x00000010u) { - total_size += 1 + - ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::StringSize( - this->_internal_stream_arn()); - } - if (PROTOBUF_PREDICT_FALSE(_internal_metadata_.have_unknown_fields())) { return ::PROTOBUF_NAMESPACE_ID::internal::ComputeUnknownFieldsSize( _internal_metadata_, total_size, &_cached_size_); @@ -2560,7 +2518,7 @@ void PutRecord::MergeFrom(const PutRecord& from) { (void) cached_has_bits; cached_has_bits = from._has_bits_[0]; - if (cached_has_bits & 0x0000001fu) { + if (cached_has_bits & 0x0000000fu) { if (cached_has_bits & 0x00000001u) { _has_bits_[0] |= 0x00000001u; stream_name_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.stream_name_); @@ -2577,10 +2535,6 @@ void PutRecord::MergeFrom(const PutRecord& from) { _has_bits_[0] |= 0x00000008u; data_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.data_); } - if (cached_has_bits & 0x00000010u) { - _has_bits_[0] |= 0x00000010u; - stream_arn_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.stream_arn_); - } } } @@ -2615,8 +2569,6 @@ void PutRecord::InternalSwap(PutRecord* other) { GetArenaNoVirtual()); data_.Swap(&other->data_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), GetArenaNoVirtual()); - stream_arn_.Swap(&other->stream_arn_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), - GetArenaNoVirtual()); } ::PROTOBUF_NAMESPACE_ID::Metadata PutRecord::GetMetadata() const { diff --git a/aws/kinesis/protobuf/messages.pb.h b/aws/kinesis/protobuf/messages.pb.h index 6833c829..e1450507 100644 --- a/aws/kinesis/protobuf/messages.pb.h +++ b/aws/kinesis/protobuf/messages.pb.h @@ -1114,7 +1114,6 @@ class PutRecord : kPartitionKeyFieldNumber = 2, kExplicitHashKeyFieldNumber = 3, kDataFieldNumber = 4, - kStreamArnFieldNumber = 5, }; // required string stream_name = 1; bool has_stream_name() const; @@ -1196,26 +1195,6 @@ class PutRecord : std::string* _internal_mutable_data(); public: - // optional string stream_arn = 5; - bool has_stream_arn() const; - private: - bool _internal_has_stream_arn() const; - public: - void clear_stream_arn(); - const std::string& stream_arn() const; - void set_stream_arn(const std::string& value); - void set_stream_arn(std::string&& value); - void set_stream_arn(const char* value); - void set_stream_arn(const char* value, size_t size); - std::string* mutable_stream_arn(); - std::string* release_stream_arn(); - void set_allocated_stream_arn(std::string* stream_arn); - private: - const std::string& _internal_stream_arn() const; - void _internal_set_stream_arn(const std::string& value); - std::string* _internal_mutable_stream_arn(); - public: - // @@protoc_insertion_point(class_scope:aws.kinesis.protobuf.PutRecord) private: class _Internal; @@ -1230,7 +1209,6 @@ class PutRecord : ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr partition_key_; ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr explicit_hash_key_; ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr data_; - ::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr stream_arn_; friend struct ::TableStruct_messages_2eproto; }; // ------------------------------------------------------------------- @@ -4251,77 +4229,6 @@ inline void PutRecord::set_allocated_data(std::string* data) { // @@protoc_insertion_point(field_set_allocated:aws.kinesis.protobuf.PutRecord.data) } -// optional string stream_arn = 5; -inline bool PutRecord::_internal_has_stream_arn() const { - bool value = (_has_bits_[0] & 0x00000010u) != 0; - return value; -} -inline bool PutRecord::has_stream_arn() const { - return _internal_has_stream_arn(); -} -inline void PutRecord::clear_stream_arn() { - stream_arn_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); - _has_bits_[0] &= ~0x00000010u; -} -inline const std::string& PutRecord::stream_arn() const { - // @@protoc_insertion_point(field_get:aws.kinesis.protobuf.PutRecord.stream_arn) - return _internal_stream_arn(); -} -inline void PutRecord::set_stream_arn(const std::string& value) { - _internal_set_stream_arn(value); - // @@protoc_insertion_point(field_set:aws.kinesis.protobuf.PutRecord.stream_arn) -} -inline std::string* PutRecord::mutable_stream_arn() { - // @@protoc_insertion_point(field_mutable:aws.kinesis.protobuf.PutRecord.stream_arn) - return _internal_mutable_stream_arn(); -} -inline const std::string& PutRecord::_internal_stream_arn() const { - return stream_arn_.GetNoArena(); -} -inline void PutRecord::_internal_set_stream_arn(const std::string& value) { - _has_bits_[0] |= 0x00000010u; - stream_arn_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), value); -} -inline void PutRecord::set_stream_arn(std::string&& value) { - _has_bits_[0] |= 0x00000010u; - stream_arn_.SetNoArena( - &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::move(value)); - // @@protoc_insertion_point(field_set_rvalue:aws.kinesis.protobuf.PutRecord.stream_arn) -} -inline void PutRecord::set_stream_arn(const char* value) { - GOOGLE_DCHECK(value != nullptr); - _has_bits_[0] |= 0x00000010u; - stream_arn_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), ::std::string(value)); - // @@protoc_insertion_point(field_set_char:aws.kinesis.protobuf.PutRecord.stream_arn) -} -inline void PutRecord::set_stream_arn(const char* value, size_t size) { - _has_bits_[0] |= 0x00000010u; - stream_arn_.SetNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), - ::std::string(reinterpret_cast(value), size)); - // @@protoc_insertion_point(field_set_pointer:aws.kinesis.protobuf.PutRecord.stream_arn) -} -inline std::string* PutRecord::_internal_mutable_stream_arn() { - _has_bits_[0] |= 0x00000010u; - return stream_arn_.MutableNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); -} -inline std::string* PutRecord::release_stream_arn() { - // @@protoc_insertion_point(field_release:aws.kinesis.protobuf.PutRecord.stream_arn) - if (!_internal_has_stream_arn()) { - return nullptr; - } - _has_bits_[0] &= ~0x00000010u; - return stream_arn_.ReleaseNonDefaultNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited()); -} -inline void PutRecord::set_allocated_stream_arn(std::string* stream_arn) { - if (stream_arn != nullptr) { - _has_bits_[0] |= 0x00000010u; - } else { - _has_bits_[0] &= ~0x00000010u; - } - stream_arn_.SetAllocatedNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), stream_arn); - // @@protoc_insertion_point(field_set_allocated:aws.kinesis.protobuf.PutRecord.stream_arn) -} - // ------------------------------------------------------------------- // Flush diff --git a/aws/kinesis/protobuf/messages.proto b/aws/kinesis/protobuf/messages.proto index 78781ed0..9af3de16 100644 --- a/aws/kinesis/protobuf/messages.proto +++ b/aws/kinesis/protobuf/messages.proto @@ -42,7 +42,6 @@ message PutRecord { required string partition_key = 2; optional string explicit_hash_key = 3; required bytes data = 4; - optional string stream_arn = 5; } message Flush { diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java index 67732a2b..085e6774 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java @@ -14,11 +14,7 @@ public interface IKinesisProducer { ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data); - ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data, String streamARN); - - ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN); - - ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema); + ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema); int getOutstandingRecordsCount(); diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java index 8da111bc..aa88cc5d 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java @@ -408,7 +408,7 @@ protected KinesisProducer(File inPipe, File outPipe) { */ @Override public ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data) { - return addUserRecord(stream, partitionKey, null, data, null); + return addUserRecord(stream, partitionKey, null, data); } /** @@ -465,7 +465,7 @@ public ListenableFuture addUserRecord(String stream, String pa */ @Override public ListenableFuture addUserRecord(UserRecord userRecord) { - return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData(), userRecord.getStreamARN(), userRecord.getSchema()); + return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData(), userRecord.getSchema()); } /** @@ -532,144 +532,11 @@ public ListenableFuture addUserRecord(UserRecord userRecord) { */ @Override public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) { - return addUserRecord(stream, partitionKey, explicitHashKey, data, null, null); + return addUserRecord(stream, partitionKey, explicitHashKey, data, null); } - /** - * Put a record asynchronously. A {@link ListenableFuture} is returned that - * can be used to retrieve the result, either by polling or by registering a - * callback. - * - *

- * The return value can be disregarded if you do not wish to process the - * result. Under the covers, the KPL will automatically reattempt puts in - * case of transient errors (including throttling). A failed result is - * generally returned only if an irrecoverable error is detected (e.g. - * trying to put to a stream that doesn't exist), or if the record expires. - * - *

- * Thread safe. - * - *

- * To add a listener to the future: - *

- * - * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); - * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); - * - *

- * where callback is an instance of - * {@link com.google.common.util.concurrent.FutureCallback} and - * executor is an instance of - * {@link java.util.concurrent.Executor}. - *

- * Important: - *

- * If long-running tasks are performed in the callbacks, it is recommended - * that a custom executor be provided when registering callbacks to ensure - * that there are enough threads to achieve the desired level of - * parallelism. By default, the KPL will use an internal thread pool to - * execute callbacks, but this pool may not have a sufficient number of - * threads if a large number is desired. - *

- * Another option would be to hand the result off to a different component - * for processing and keep the callback routine fast. - * - * @param stream - * Stream to put to. - * @param partitionKey - * Partition key. Length must be at least one, and at most 256 - * (inclusive). - * @param data - * Binary data of the record. Maximum size 1MiB. - * @return A future for the result of the put. - * @param streamARN - * ARN of the stream, e.g., arn:aws:kinesis:us-east-2:123456789012:stream/mystream - * @throws IllegalArgumentException - * if input does not meet stated constraints - * @throws DaemonException - * if the child process is dead - * @see ListenableFuture - * @see UserRecordResult - * @see KinesisProducerConfiguration#setRecordTtl(long) - * @see UserRecordFailedException - */ - @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data, String streamARN) { - return addUserRecord(stream, partitionKey, null, data, streamARN, null); - } - - /** - * Put a record asynchronously. A {@link ListenableFuture} is returned that - * can be used to retrieve the result, either by polling or by registering a - * callback. - * - *

- * The return value can be disregarded if you do not wish to process the - * result. Under the covers, the KPL will automatically reattempt puts in - * case of transient errors (including throttling). A failed result is - * generally returned only if an irrecoverable error is detected (e.g. - * trying to put to a stream that doesn't exist), or if the record expires. - * - *

- * Thread safe. - * - *

- * To add a listener to the future: - *

- * - * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); - * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); - * - *

- * where callback is an instance of - * {@link com.google.common.util.concurrent.FutureCallback} and - * executor is an instance of - * {@link java.util.concurrent.Executor}. - *

- * Important: - *

- * If long-running tasks are performed in the callbacks, it is recommended - * that a custom executor be provided when registering callbacks to ensure - * that there are enough threads to achieve the desired level of - * parallelism. By default, the KPL will use an internal thread pool to - * execute callbacks, but this pool may not have a sufficient number of - * threads if a large number is desired. - *

- * Another option would be to hand the result off to a different component - * for processing and keep the callback routine fast. - * - * @param stream - * Stream to put to. - * @param partitionKey - * Partition key. Length must be at least one, and at most 256 - * (inclusive). - * @param explicitHashKey - * The hash value used to explicitly determine the shard the data - * record is assigned to by overriding the partition key hash. - * Must be a valid string representation of a positive integer - * with value between 0 and 2^128 - 1 (inclusive). - * @param data - * Binary data of the record. Maximum size 1MiB. - * @return A future for the result of the put. - * @param streamARN - * ARN of the stream, e.g., arn:aws:kinesis:us-east-2:123456789012:stream/mystream - * @throws IllegalArgumentException - * if input does not meet stated constraints - * @throws DaemonException - * if the child process is dead - * @see ListenableFuture - * @see UserRecordResult - * @see KinesisProducerConfiguration#setRecordTtl(long) - * @see UserRecordFailedException - */ @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN) { - return addUserRecord(stream, partitionKey, explicitHashKey, data, streamARN, null); - } - - @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema) { + public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) { if (stream == null) { throw new IllegalArgumentException("Stream name cannot be null"); } @@ -750,9 +617,6 @@ public ListenableFuture addUserRecord(String stream, String pa if (b != null) { pr.setExplicitHashKey(b.toString(10)); } - if(streamARN != null) { - pr.setStreamArn(streamARN); - } Message m = Message.newBuilder() .setId(id) diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java index 2e0cad67..642ed349 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/UserRecord.java @@ -48,12 +48,6 @@ public UserRecord(String streamName, String partitionKey, ByteBuffer data) { this.partitionKey = partitionKey; this.data = data; } - public UserRecord(String streamName, String partitionKey, ByteBuffer data, String streamARN) { - this.streamName = streamName; - this.partitionKey = partitionKey; - this.streamARN = streamARN; - this.data = data; - } public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data) { this.streamName = streamName; @@ -61,19 +55,11 @@ public UserRecord(String streamName, String partitionKey, String explicitHashKey this.explicitHashKey = explicitHashKey; this.data = data; } - public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN) { - this.streamName = streamName; - this.partitionKey = partitionKey; - this.explicitHashKey = explicitHashKey; - this.streamARN = streamARN; - this.data = data; - } - public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema) { + public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) { this.streamName = streamName; this.partitionKey = partitionKey; this.explicitHashKey = explicitHashKey; - this.streamARN= streamARN; this.data = data; this.schema = schema; } @@ -104,17 +90,6 @@ public UserRecord withPartitionKey(String partitionKey) { return this; } - public String getStreamARN() { return streamARN; } - - public void setStreamARN(String streamARN) { - this.streamARN = streamARN; - } - - public UserRecord withStreamARN(String streamARN) { - this.streamARN = streamARN; - return this; - } - public ByteBuffer getData() { return data; } diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java index fdf76281..c64651a4 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Config.java @@ -1868,7 +1868,7 @@ public java.util.Listrepeated .aws.kinesis.protobuf.AdditionalDimension additional_metric_dims = 128; */ - public java.util.List + public java.util.List getAdditionalMetricDimsOrBuilderList() { return additionalMetricDims_; } @@ -1961,7 +1961,7 @@ public java.lang.String getCloudwatchEndpoint() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -1978,7 +1978,7 @@ public java.lang.String getCloudwatchEndpoint() { getCloudwatchEndpointBytes() { java.lang.Object ref = cloudwatchEndpoint_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); cloudwatchEndpoint_ = b; @@ -2108,7 +2108,7 @@ public java.lang.String getKinesisEndpoint() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2125,7 +2125,7 @@ public java.lang.String getKinesisEndpoint() { getKinesisEndpointBytes() { java.lang.Object ref = kinesisEndpoint_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); kinesisEndpoint_ = b; @@ -2170,7 +2170,7 @@ public java.lang.String getLogLevel() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2187,7 +2187,7 @@ public java.lang.String getLogLevel() { getLogLevelBytes() { java.lang.Object ref = logLevel_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); logLevel_ = b; @@ -2232,7 +2232,7 @@ public java.lang.String getMetricsGranularity() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2249,7 +2249,7 @@ public java.lang.String getMetricsGranularity() { getMetricsGranularityBytes() { java.lang.Object ref = metricsGranularity_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsGranularity_ = b; @@ -2277,7 +2277,7 @@ public java.lang.String getMetricsLevel() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2294,7 +2294,7 @@ public java.lang.String getMetricsLevel() { getMetricsLevelBytes() { java.lang.Object ref = metricsLevel_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsLevel_ = b; @@ -2322,7 +2322,7 @@ public java.lang.String getMetricsNamespace() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2339,7 +2339,7 @@ public java.lang.String getMetricsNamespace() { getMetricsNamespaceBytes() { java.lang.Object ref = metricsNamespace_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsNamespace_ = b; @@ -2452,7 +2452,7 @@ public java.lang.String getRegion() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2469,7 +2469,7 @@ public java.lang.String getRegion() { getRegionBytes() { java.lang.Object ref = region_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); region_ = b; @@ -2531,7 +2531,7 @@ public java.lang.String getProxyHost() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2548,7 +2548,7 @@ public java.lang.String getProxyHost() { getProxyHostBytes() { java.lang.Object ref = proxyHost_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyHost_ = b; @@ -2593,7 +2593,7 @@ public java.lang.String getProxyUserName() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2610,7 +2610,7 @@ public java.lang.String getProxyUserName() { getProxyUserNameBytes() { java.lang.Object ref = proxyUserName_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyUserName_ = b; @@ -2638,7 +2638,7 @@ public java.lang.String getProxyPassword() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2655,7 +2655,7 @@ public java.lang.String getProxyPassword() { getProxyPasswordBytes() { java.lang.Object ref = proxyPassword_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyPassword_ = b; @@ -2683,7 +2683,7 @@ public java.lang.String getStsEndpoint() { if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { - com.google.protobuf.ByteString bs = + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { @@ -2700,7 +2700,7 @@ public java.lang.String getStsEndpoint() { getStsEndpointBytes() { java.lang.Object ref = stsEndpoint_; if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); stsEndpoint_ = b; @@ -3812,7 +3812,7 @@ public Builder mergeFrom(com.amazonaws.services.kinesis.producer.protobuf.Config additionalMetricDimsBuilder_ = null; additionalMetricDims_ = other.additionalMetricDims_; bitField0_ = (bitField0_ & ~0x00000001); - additionalMetricDimsBuilder_ = + additionalMetricDimsBuilder_ = com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders ? getAdditionalMetricDimsFieldBuilder() : null; } else { @@ -4172,7 +4172,7 @@ public com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensi /** * repeated .aws.kinesis.protobuf.AdditionalDimension additional_metric_dims = 128; */ - public java.util.List + public java.util.List getAdditionalMetricDimsOrBuilderList() { if (additionalMetricDimsBuilder_ != null) { return additionalMetricDimsBuilder_.getMessageOrBuilderList(); @@ -4198,12 +4198,12 @@ public com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensi /** * repeated .aws.kinesis.protobuf.AdditionalDimension additional_metric_dims = 128; */ - public java.util.List + public java.util.List getAdditionalMetricDimsBuilderList() { return getAdditionalMetricDimsFieldBuilder().getBuilderList(); } private com.google.protobuf.RepeatedFieldBuilderV3< - com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension.Builder, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensionOrBuilder> + com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension.Builder, com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimensionOrBuilder> getAdditionalMetricDimsFieldBuilder() { if (additionalMetricDimsBuilder_ == null) { additionalMetricDimsBuilder_ = new com.google.protobuf.RepeatedFieldBuilderV3< @@ -4362,7 +4362,7 @@ public java.lang.String getCloudwatchEndpoint() { getCloudwatchEndpointBytes() { java.lang.Object ref = cloudwatchEndpoint_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); cloudwatchEndpoint_ = b; @@ -4668,7 +4668,7 @@ public java.lang.String getKinesisEndpoint() { getKinesisEndpointBytes() { java.lang.Object ref = kinesisEndpoint_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); kinesisEndpoint_ = b; @@ -4789,7 +4789,7 @@ public java.lang.String getLogLevel() { getLogLevelBytes() { java.lang.Object ref = logLevel_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); logLevel_ = b; @@ -4910,7 +4910,7 @@ public java.lang.String getMetricsGranularity() { getMetricsGranularityBytes() { java.lang.Object ref = metricsGranularity_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsGranularity_ = b; @@ -4994,7 +4994,7 @@ public java.lang.String getMetricsLevel() { getMetricsLevelBytes() { java.lang.Object ref = metricsLevel_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsLevel_ = b; @@ -5078,7 +5078,7 @@ public java.lang.String getMetricsNamespace() { getMetricsNamespaceBytes() { java.lang.Object ref = metricsNamespace_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); metricsNamespace_ = b; @@ -5347,7 +5347,7 @@ public java.lang.String getRegion() { getRegionBytes() { java.lang.Object ref = region_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); region_ = b; @@ -5505,7 +5505,7 @@ public java.lang.String getProxyHost() { getProxyHostBytes() { java.lang.Object ref = proxyHost_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyHost_ = b; @@ -5626,7 +5626,7 @@ public java.lang.String getProxyUserName() { getProxyUserNameBytes() { java.lang.Object ref = proxyUserName_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyUserName_ = b; @@ -5710,7 +5710,7 @@ public java.lang.String getProxyPassword() { getProxyPasswordBytes() { java.lang.Object ref = proxyPassword_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); proxyPassword_ = b; @@ -5794,7 +5794,7 @@ public java.lang.String getStsEndpoint() { getStsEndpointBytes() { java.lang.Object ref = stsEndpoint_; if (ref instanceof String) { - com.google.protobuf.ByteString b = + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); stsEndpoint_ = b; @@ -6014,12 +6014,12 @@ public com.amazonaws.services.kinesis.producer.protobuf.Config.Configuration get private static final com.google.protobuf.Descriptors.Descriptor internal_static_aws_kinesis_protobuf_AdditionalDimension_descriptor; - private static final + private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_aws_kinesis_protobuf_AdditionalDimension_fieldAccessorTable; private static final com.google.protobuf.Descriptors.Descriptor internal_static_aws_kinesis_protobuf_Configuration_descriptor; - private static final + private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_aws_kinesis_protobuf_Configuration_fieldAccessorTable; diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java index 60b0aa5e..a50e361d 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/protobuf/Messages.java @@ -5516,23 +5516,6 @@ public interface PutRecordOrBuilder extends * @return The data. */ com.google.protobuf.ByteString getData(); - - /** - * optional string stream_arn = 5; - * @return Whether the streamArn field is set. - */ - boolean hasStreamArn(); - /** - * optional string stream_arn = 5; - * @return The streamArn. - */ - java.lang.String getStreamArn(); - /** - * optional string stream_arn = 5; - * @return The bytes for streamArn. - */ - com.google.protobuf.ByteString - getStreamArnBytes(); } /** * Protobuf type {@code aws.kinesis.protobuf.PutRecord} @@ -5551,7 +5534,6 @@ private PutRecord() { partitionKey_ = ""; explicitHashKey_ = ""; data_ = com.google.protobuf.ByteString.EMPTY; - streamArn_ = ""; } @java.lang.Override @@ -5608,12 +5590,6 @@ private PutRecord( data_ = input.readBytes(); break; } - case 42: { - com.google.protobuf.ByteString bs = input.readBytes(); - bitField0_ |= 0x00000010; - streamArn_ = bs; - break; - } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -5799,51 +5775,6 @@ public com.google.protobuf.ByteString getData() { return data_; } - public static final int STREAM_ARN_FIELD_NUMBER = 5; - private volatile java.lang.Object streamArn_; - /** - * optional string stream_arn = 5; - * @return Whether the streamArn field is set. - */ - public boolean hasStreamArn() { - return ((bitField0_ & 0x00000010) != 0); - } - /** - * optional string stream_arn = 5; - * @return The streamArn. - */ - public java.lang.String getStreamArn() { - java.lang.Object ref = streamArn_; - if (ref instanceof java.lang.String) { - return (java.lang.String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - streamArn_ = s; - } - return s; - } - } - /** - * optional string stream_arn = 5; - * @return The bytes for streamArn. - */ - public com.google.protobuf.ByteString - getStreamArnBytes() { - java.lang.Object ref = streamArn_; - if (ref instanceof java.lang.String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - streamArn_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -5882,9 +5813,6 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000008) != 0)) { output.writeBytes(4, data_); } - if (((bitField0_ & 0x00000010) != 0)) { - com.google.protobuf.GeneratedMessageV3.writeString(output, 5, streamArn_); - } unknownFields.writeTo(output); } @@ -5907,9 +5835,6 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, data_); } - if (((bitField0_ & 0x00000010) != 0)) { - size += com.google.protobuf.GeneratedMessageV3.computeStringSize(5, streamArn_); - } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -5945,11 +5870,6 @@ public boolean equals(final java.lang.Object obj) { if (!getData() .equals(other.getData())) return false; } - if (hasStreamArn() != other.hasStreamArn()) return false; - if (hasStreamArn()) { - if (!getStreamArn() - .equals(other.getStreamArn())) return false; - } if (!unknownFields.equals(other.unknownFields)) return false; return true; } @@ -5977,10 +5897,6 @@ public int hashCode() { hash = (37 * hash) + DATA_FIELD_NUMBER; hash = (53 * hash) + getData().hashCode(); } - if (hasStreamArn()) { - hash = (37 * hash) + STREAM_ARN_FIELD_NUMBER; - hash = (53 * hash) + getStreamArn().hashCode(); - } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -6122,8 +6038,6 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000004); data_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000008); - streamArn_ = ""; - bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -6168,10 +6082,6 @@ public com.amazonaws.services.kinesis.producer.protobuf.Messages.PutRecord build to_bitField0_ |= 0x00000008; } result.data_ = data_; - if (((from_bitField0_ & 0x00000010) != 0)) { - to_bitField0_ |= 0x00000010; - } - result.streamArn_ = streamArn_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6239,11 +6149,6 @@ public Builder mergeFrom(com.amazonaws.services.kinesis.producer.protobuf.Messag if (other.hasData()) { setData(other.getData()); } - if (other.hasStreamArn()) { - bitField0_ |= 0x00000010; - streamArn_ = other.streamArn_; - onChanged(); - } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -6574,90 +6479,6 @@ public Builder clearData() { onChanged(); return this; } - - private java.lang.Object streamArn_ = ""; - /** - * optional string stream_arn = 5; - * @return Whether the streamArn field is set. - */ - public boolean hasStreamArn() { - return ((bitField0_ & 0x00000010) != 0); - } - /** - * optional string stream_arn = 5; - * @return The streamArn. - */ - public java.lang.String getStreamArn() { - java.lang.Object ref = streamArn_; - if (!(ref instanceof java.lang.String)) { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - java.lang.String s = bs.toStringUtf8(); - if (bs.isValidUtf8()) { - streamArn_ = s; - } - return s; - } else { - return (java.lang.String) ref; - } - } - /** - * optional string stream_arn = 5; - * @return The bytes for streamArn. - */ - public com.google.protobuf.ByteString - getStreamArnBytes() { - java.lang.Object ref = streamArn_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8( - (java.lang.String) ref); - streamArn_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - /** - * optional string stream_arn = 5; - * @param value The streamArn to set. - * @return This builder for chaining. - */ - public Builder setStreamArn( - java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000010; - streamArn_ = value; - onChanged(); - return this; - } - /** - * optional string stream_arn = 5; - * @return This builder for chaining. - */ - public Builder clearStreamArn() { - bitField0_ = (bitField0_ & ~0x00000010); - streamArn_ = getDefaultInstance().getStreamArn(); - onChanged(); - return this; - } - /** - * optional string stream_arn = 5; - * @param value The bytes for streamArn to set. - * @return This builder for chaining. - */ - public Builder setStreamArnBytes( - com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000010; - streamArn_ = value; - onChanged(); - return this; - } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -16032,31 +15853,31 @@ public com.amazonaws.services.kinesis.producer.protobuf.Messages.MetricsResponse "H\000\022A\n\020metrics_response\030\010 \001(\0132%.aws.kines" + "is.protobuf.MetricsResponseH\000\022?\n\017set_cre" + "dentials\030\t \001(\0132$.aws.kinesis.protobuf.Se" + - "tCredentialsH\000B\020\n\016actual_message\"t\n\tPutR" + + "tCredentialsH\000B\020\n\016actual_message\"`\n\tPutR" + "ecord\022\023\n\013stream_name\030\001 \002(\t\022\025\n\rpartition_" + "key\030\002 \002(\t\022\031\n\021explicit_hash_key\030\003 \001(\t\022\014\n\004" + - "data\030\004 \002(\014\022\022\n\nstream_arn\030\005 \001(\t\"\034\n\005Flush\022" + - "\023\n\013stream_name\030\001 \001(\t\"f\n\007Attempt\022\r\n\005delay" + - "\030\001 \002(\r\022\020\n\010duration\030\002 \002(\r\022\017\n\007success\030\003 \002(" + - "\010\022\022\n\nerror_code\030\004 \001(\t\022\025\n\rerror_message\030\005" + - " \001(\t\"~\n\017PutRecordResult\022/\n\010attempts\030\001 \003(" + - "\0132\035.aws.kinesis.protobuf.Attempt\022\017\n\007succ" + - "ess\030\002 \002(\010\022\020\n\010shard_id\030\003 \001(\t\022\027\n\017sequence_" + - "number\030\004 \001(\t\">\n\013Credentials\022\014\n\004akid\030\001 \002(" + - "\t\022\022\n\nsecret_key\030\002 \002(\t\022\r\n\005token\030\003 \001(\t\"]\n\016" + - "SetCredentials\022\023\n\013for_metrics\030\001 \001(\010\0226\n\013c" + - "redentials\030\002 \002(\0132!.aws.kinesis.protobuf." + - "Credentials\"\'\n\tDimension\022\013\n\003key\030\001 \002(\t\022\r\n" + - "\005value\030\002 \002(\t\"K\n\005Stats\022\r\n\005count\030\001 \002(\001\022\013\n\003" + - "sum\030\002 \002(\001\022\014\n\004mean\030\003 \002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003" + - "max\030\005 \002(\001\"\210\001\n\006Metric\022\014\n\004name\030\001 \002(\t\0223\n\ndi" + - "mensions\030\002 \003(\0132\037.aws.kinesis.protobuf.Di" + - "mension\022*\n\005stats\030\003 \002(\0132\033.aws.kinesis.pro" + - "tobuf.Stats\022\017\n\007seconds\030\004 \002(\004\"/\n\016MetricsR" + - "equest\022\014\n\004name\030\001 \001(\t\022\017\n\007seconds\030\002 \001(\004\"@\n" + - "\017MetricsResponse\022-\n\007metrics\030\001 \003(\0132\034.aws." + - "kinesis.protobuf.MetricB2\n0com.amazonaws" + - ".services.kinesis.producer.protobuf" + "data\030\004 \002(\014\"\034\n\005Flush\022\023\n\013stream_name\030\001 \001(\t" + + "\"f\n\007Attempt\022\r\n\005delay\030\001 \002(\r\022\020\n\010duration\030\002" + + " \002(\r\022\017\n\007success\030\003 \002(\010\022\022\n\nerror_code\030\004 \001(" + + "\t\022\025\n\rerror_message\030\005 \001(\t\"~\n\017PutRecordRes" + + "ult\022/\n\010attempts\030\001 \003(\0132\035.aws.kinesis.prot" + + "obuf.Attempt\022\017\n\007success\030\002 \002(\010\022\020\n\010shard_i" + + "d\030\003 \001(\t\022\027\n\017sequence_number\030\004 \001(\t\">\n\013Cred" + + "entials\022\014\n\004akid\030\001 \002(\t\022\022\n\nsecret_key\030\002 \002(" + + "\t\022\r\n\005token\030\003 \001(\t\"]\n\016SetCredentials\022\023\n\013fo" + + "r_metrics\030\001 \001(\010\0226\n\013credentials\030\002 \002(\0132!.a" + + "ws.kinesis.protobuf.Credentials\"\'\n\tDimen" + + "sion\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\t\"K\n\005Stat" + + "s\022\r\n\005count\030\001 \002(\001\022\013\n\003sum\030\002 \002(\001\022\014\n\004mean\030\003 " + + "\002(\001\022\013\n\003min\030\004 \002(\001\022\013\n\003max\030\005 \002(\001\"\210\001\n\006Metric" + + "\022\014\n\004name\030\001 \002(\t\0223\n\ndimensions\030\002 \003(\0132\037.aws" + + ".kinesis.protobuf.Dimension\022*\n\005stats\030\003 \002" + + "(\0132\033.aws.kinesis.protobuf.Stats\022\017\n\007secon" + + "ds\030\004 \002(\004\"/\n\016MetricsRequest\022\014\n\004name\030\001 \001(\t" + + "\022\017\n\007seconds\030\002 \001(\004\"@\n\017MetricsResponse\022-\n\007" + + "metrics\030\001 \003(\0132\034.aws.kinesis.protobuf.Met" + + "ricB2\n0com.amazonaws.services.kinesis.pr" + + "oducer.protobuf" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -16092,7 +15913,7 @@ public com.amazonaws.services.kinesis.producer.protobuf.Messages.MetricsResponse internal_static_aws_kinesis_protobuf_PutRecord_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_aws_kinesis_protobuf_PutRecord_descriptor, - new java.lang.String[] { "StreamName", "PartitionKey", "ExplicitHashKey", "Data", "StreamArn", }); + new java.lang.String[] { "StreamName", "PartitionKey", "ExplicitHashKey", "Data", }); internal_static_aws_kinesis_protobuf_Flush_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_aws_kinesis_protobuf_Flush_fieldAccessorTable = new diff --git a/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java b/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java index b446bdca..8f4bd8e3 100644 --- a/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java +++ b/java/amazon-kinesis-producer/src/test/java/com/amazonaws/services/kinesis/producer/KinesisProducerTest.java @@ -138,7 +138,7 @@ public void differentCredsForRecordsAndMetrics() throws InterruptedException, Ex final long start = System.nanoTime(); while (System.nanoTime() - start < 500 * 1000000) { - kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0]), "streamARN"); + kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0])); kp.flush(); Thread.sleep(10); } @@ -187,7 +187,7 @@ public void rotatingCredentials() throws InterruptedException, ExecutionExceptio final long start = System.nanoTime(); while (System.nanoTime() - start < 500 * 1000000) { - kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0]), "streamARN"); + kp.addUserRecord("streamName", "partitionKey", ByteBuffer.wrap(new byte[0])); kp.flush(); Thread.sleep(10); } @@ -253,10 +253,9 @@ public void schemaIntegration_OnInvalidSchema_ThrowsException() { String partitionKey = "partitionKey"; String hashKey = null; ByteBuffer data = ByteBuffer.wrap(new byte[] { 01, 23, 54 }); - String streamARN = "streamARN"; String schemaDefinition = null; Schema schema = new Schema(schemaDefinition, DataFormat.AVRO.toString(), "testSchema"); - UserRecord userRecord = new UserRecord(stream, partitionKey, hashKey, data, streamARN, schema); + UserRecord userRecord = new UserRecord(stream, partitionKey, hashKey, data, schema); try { kinesisProducer.addUserRecord(userRecord);