-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(sink): handle Kinesis PutRecords partial success and throttle #17983
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically LGTM, match the desc on the manual
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix
// PutRecords doesn’t guarantee the ordering of records. If you need to read records in the same | ||
// order they are written to the stream, use PutRecord instead of PutRecords, and write to the same shard. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we acknowledge this limitation in doc as there is currently no user option to opt for ordering rather than batching?
} | ||
if total_payload_size + *size < MAX_TOTAL_RECORD_PAYLOAD_SIZE { | ||
total_payload_size += *size; | ||
records.push(record.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The need of clone
here feels like a API design flaw in the AWS auto-generated SDK. I don't think we can do anything but just point it out.
Impact:
Every single message written out is cloned at least once.
Root cause:
AWS SDK takes the ownership away but does not give it back for failure retry.
src/connector/src/sink/kinesis.rs
Outdated
#[expect(rw::format_error)] | ||
return Err(SinkError::Kinesis(anyhow!( | ||
"failed to send records. sent {} out of {}, last err: {:?}", | ||
start_idx, | ||
total_count, | ||
e.as_report() | ||
))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When wrapping an error and returning to caller (rather than handling/consuming the error as warn!
below), use context
:
return Err(
SinkError::Kinesis(
anyhow!(e).context(format!("failed to send records. sent {start_idx} out of {total_count}, last err:"))
)
);
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Changes on the doc of kinesis sink:
In kinesis sink, we use PutRecords to send multiple records in a batch to achieve higher throughput. Due to the limitation of kinesis, records might be out of order when using such API. Nevertheless, in the current kinesis sink implementation, we ensure at-least-once delivery semantic, and eventual consistency.