Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support async for mongodb dynamodb #17645

Merged
merged 12 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 166 additions & 98 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ use anyhow::{anyhow, Context};
use aws_sdk_dynamodb as dynamodb;
use aws_sdk_dynamodb::client::Client;
use aws_smithy_types::Blob;
use dynamodb::types::{
AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics,
TableStatus, WriteRequest,
};
use maplit::hashmap;
use dynamodb::types::{AttributeValue, TableStatus, WriteRequest};
use futures::prelude::future::TryFutureExt;
use futures::prelude::TryFuture;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row as _;
Expand All @@ -31,6 +29,7 @@ use risingwave_common::util::iter_util::ZipEqDebug;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;
use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};

use super::log_store::DeliveryFutureManagerAddFuture;
use super::writer::{
Expand All @@ -50,10 +49,33 @@ pub struct DynamoDbConfig {

#[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
#[serde_as(as = "DisplayFromStr")]
#[deprecated]
pub max_batch_rows: usize,

#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,

#[serde(
rename = "dynamodb.max_batch_item_nums",
default = "default_max_batch_item_nums"
)]
#[serde_as(as = "DisplayFromStr")]
pub max_batch_item_nums: usize,

#[serde(
rename = "dynamodb.max_future_send_nums",
default = "default_max_future_send_nums"
)]
#[serde_as(as = "DisplayFromStr")]
pub max_future_send_nums: usize,
}

fn default_max_batch_item_nums() -> usize {
25
}

fn default_max_future_send_nums() -> usize {
256
}

fn default_max_batch_rows() -> usize {
Expand Down Expand Up @@ -141,7 +163,7 @@ impl Sink for DynamoDbSink {
Ok(
DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
.await?
.into_log_sinker(usize::MAX),
.into_log_sinker(self.config.max_future_send_nums),
)
}
}
Expand Down Expand Up @@ -183,77 +205,7 @@ impl DynamoDbRequest {
}
}

struct DynamoDbPayloadWriter {
request_items: Vec<DynamoDbRequest>,
client: Client,
table: String,
dynamodb_keys: Vec<String>,
}

impl DynamoDbPayloadWriter {
fn write_one_insert(&mut self, item: HashMap<String, AttributeValue>) {
let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
let req = WriteRequest::builder().put_request(put_req).build();
self.write_one_req(req);
}

fn write_one_delete(&mut self, key: HashMap<String, AttributeValue>) {
let key = key
.into_iter()
.filter(|(k, _)| self.dynamodb_keys.contains(k))
.collect();
let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
let req = WriteRequest::builder().delete_request(del_req).build();
self.write_one_req(req);
}

fn write_one_req(&mut self, req: WriteRequest) {
let r_req = DynamoDbRequest {
inner: req,
key_items: self.dynamodb_keys.clone(),
};
if let Some(v) = r_req.extract_pk_values() {
self.request_items.retain(|item| {
!item
.extract_pk_values()
.unwrap_or_default()
.iter()
.all(|x| v.contains(x))
});
}
self.request_items.push(r_req);
}

async fn write_chunk(&mut self) -> Result<()> {
if !self.request_items.is_empty() {
let table = self.table.clone();
let req_items = std::mem::take(&mut self.request_items)
.into_iter()
.map(|r| r.inner)
.collect();
let reqs = hashmap! {
table => req_items,
};
self.client
.batch_write_item()
.set_request_items(Some(reqs))
.return_consumed_capacity(ReturnConsumedCapacity::None)
.return_item_collection_metrics(ReturnItemCollectionMetrics::None)
.send()
.await
.map_err(|e| {
SinkError::DynamoDb(
anyhow!(e).context("failed to delete item from DynamoDB sink"),
)
})?;
}

Ok(())
}
}

pub struct DynamoDbSinkWriter {
max_batch_rows: usize,
payload_writer: DynamoDbPayloadWriter,
formatter: DynamoDbFormatter,
}
Expand Down Expand Up @@ -282,56 +234,52 @@ impl DynamoDbSinkWriter {
.collect();

let payload_writer = DynamoDbPayloadWriter {
request_items: Vec::new(),
client,
table: config.table,
table: config.table.clone(),
dynamodb_keys,
max_batch_item_nums: config.max_batch_item_nums,
};

Ok(Self {
max_batch_rows: config.max_batch_rows,
payload_writer,
formatter: DynamoDbFormatter { schema },
})
}

async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> {
fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
let mut request_items = Vec::new();
for (op, row) in chunk.rows() {
let items = self.formatter.format_row(row)?;
match op {
Op::Insert | Op::UpdateInsert => {
self.payload_writer.write_one_insert(items);
self.payload_writer
.write_one_insert(items, &mut request_items);
}
Op::Delete => {
self.payload_writer.write_one_delete(items);
self.payload_writer
.write_one_delete(items, &mut request_items);
}
Op::UpdateDelete => {}
}
}
if self.payload_writer.request_items.len() >= self.max_batch_rows {
self.payload_writer.write_chunk().await?;
}
Ok(())
}

async fn flush(&mut self) -> Result<()> {
self.payload_writer.write_chunk().await
Ok(self.payload_writer.write_chunk(request_items))
}
}

pub type DynamoDbSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
type DeliveryFuture = DynamoDbSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
self.write_chunk_inner(chunk).await
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if is_checkpoint {
self.flush().await?;
}
let futures = self.write_chunk_inner(chunk)?;
add_future
.add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -400,3 +348,123 @@ fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Resu
};
Ok(attr)
}

mod write_chunk_future {
use core::result;
use std::collections::HashMap;

use anyhow::anyhow;
use aws_sdk_dynamodb as dynamodb;
use aws_sdk_dynamodb::client::Client;
use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
use dynamodb::error::SdkError;
use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput};
use dynamodb::types::{
AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
ReturnItemCollectionMetrics, WriteRequest,
};
use futures::future::{Map, TryJoinAll};
use futures::prelude::future::{try_join_all, FutureExt};
use futures::prelude::Future;
use itertools::Itertools;
use maplit::hashmap;

use super::{DynamoDbRequest, Result, SinkError};

pub type WriteChunkFuture = TryJoinAll<
Map<
impl Future<
Output = result::Result<
BatchWriteItemOutput,
SdkError<BatchWriteItemError, HttpResponse>,
>,
>,
impl FnOnce(
result::Result<BatchWriteItemOutput, SdkError<BatchWriteItemError, HttpResponse>>,
) -> Result<()>,
>,
>;
pub struct DynamoDbPayloadWriter {
pub client: Client,
pub table: String,
pub dynamodb_keys: Vec<String>,
pub max_batch_item_nums: usize,
}

impl DynamoDbPayloadWriter {
pub fn write_one_insert(
&mut self,
item: HashMap<String, AttributeValue>,
request_items: &mut Vec<DynamoDbRequest>,
) {
let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
let req = WriteRequest::builder().put_request(put_req).build();
self.write_one_req(req, request_items);
}

pub fn write_one_delete(
&mut self,
key: HashMap<String, AttributeValue>,
request_items: &mut Vec<DynamoDbRequest>,
) {
let key = key
.into_iter()
.filter(|(k, _)| self.dynamodb_keys.contains(k))
.collect();
let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
let req = WriteRequest::builder().delete_request(del_req).build();
self.write_one_req(req, request_items);
}

pub fn write_one_req(
&mut self,
req: WriteRequest,
request_items: &mut Vec<DynamoDbRequest>,
) {
let r_req = DynamoDbRequest {
inner: req,
key_items: self.dynamodb_keys.clone(),
};
if let Some(v) = r_req.extract_pk_values() {
request_items.retain(|item| {
!item
.extract_pk_values()
.unwrap_or_default()
.iter()
.all(|x| v.contains(x))
});
}
request_items.push(r_req);
}

pub fn write_chunk(&mut self, request_items: Vec<DynamoDbRequest>) -> WriteChunkFuture {
let table = self.table.clone();
let chunks = request_items
.into_iter()
.map(|r| r.inner)
.chunks(self.max_batch_item_nums);
let futures = chunks.into_iter().map(|chunk| {
let req_items = chunk.collect();
let reqs = hashmap! {
table.clone() => req_items,
};
self.client
.batch_write_item()
.set_request_items(Some(reqs))
.return_consumed_capacity(ReturnConsumedCapacity::None)
.return_item_collection_metrics(ReturnItemCollectionMetrics::None)
.send()
.map(|result| {
result
.map_err(|e| {
SinkError::DynamoDb(
anyhow!(e).context("failed to delete item from DynamoDB sink"),
)
})
.map(|_| ())
})
});
try_join_all(futures)
}
}
}
Loading
Loading