Skip to content

Commit

Permalink
Fixing bug for addUserRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
aakkem committed Jan 3, 2024
1 parent 30d5565 commit a809977
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface IKinesisProducer {

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN);

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema);

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema);

int getOutstandingRecordsCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ protected KinesisProducer(File inPipe, File outPipe) {
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data) {
return addUserRecord(stream, partitionKey, null, data, null);
return addUserRecord(stream, partitionKey, null, data, null, null);
}

/**
Expand Down Expand Up @@ -668,6 +668,72 @@ public ListenableFuture<UserRecordResult> addUserRecord(String stream, String pa
return addUserRecord(stream, partitionKey, explicitHashKey, data, streamARN, null);
}

/**
* Put a record asynchronously. A {@link ListenableFuture} is returned that
* can be used to retrieve the result, either by polling or by registering a
* callback.
*
* <p>
* The return value can be disregarded if you do not wish to process the
* result. Under the covers, the KPL will automatically reattempt puts in
* case of transient errors (including throttling). A failed result is
* generally returned only if an irrecoverable error is detected (e.g.
* trying to put to a stream that doesn't exist), or if the record expires.
*
* <p>
* <b>Thread safe.</b>
*
* <p>
* To add a listener to the future:
* <p>
* <code>
* ListenableFuture&lt;PutRecordResult&gt; f = myKinesisProducer.addUserRecord(...);
* com.google.common.util.concurrent.Futures.addCallback(f, callback, executor);
* </code>
* <p>
* where <code>callback</code> is an instance of
* {@link com.google.common.util.concurrent.FutureCallback} and
* <code>executor</code> is an instance of
* {@link java.util.concurrent.Executor}.
* <p>
* <b>Important:</b>
* <p>
* If long-running tasks are performed in the callbacks, it is recommended
* that a custom executor be provided when registering callbacks to ensure
* that there are enough threads to achieve the desired level of
* parallelism. By default, the KPL will use an internal thread pool to
* execute callbacks, but this pool may not have a sufficient number of
* threads if a large number is desired.
* <p>
* Another option would be to hand the result off to a different component
* for processing and keep the callback routine fast.
*
* @param stream
* Stream to put to.
* @param partitionKey
* Partition key. Length must be at least one, and at most 256
* (inclusive).
* @param explicitHashKey
* The hash value used to explicitly determine the shard the data
* record is assigned to by overriding the partition key hash.
* Must be a valid string representation of a positive integer
* with value between 0 and <code>2^128 - 1</code> (inclusive).
* @param data
* Binary data of the record. Maximum size 1MiB.
* @return A future for the result of the put.
* @throws IllegalArgumentException
* if input does not meet stated constraints
* @throws DaemonException
* if the child process is dead
* @see ListenableFuture
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) {
return addUserRecord(stream, partitionKey, explicitHashKey, data, null, schema);
}
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema) {
if (stream == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ public UserRecord(String streamName, String partitionKey, String explicitHashKey
this.data = data;
}

public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) {
this.streamName = streamName;
this.partitionKey = partitionKey;
this.explicitHashKey = explicitHashKey;
this.data = data;
this.schema = schema;
}
public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data, String streamARN, Schema schema) {
this.streamName = streamName;
this.partitionKey = partitionKey;
Expand Down

0 comments on commit a809977

Please sign in to comment.