Skip to content

Commit

Permalink
feat(sink): support async for mongodb dynamodb (#17645)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 27, 2024
1 parent 1ddf30e commit d877481
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 245 deletions.
264 changes: 166 additions & 98 deletions src/connector/src/sink/dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ use anyhow::{anyhow, Context};
use aws_sdk_dynamodb as dynamodb;
use aws_sdk_dynamodb::client::Client;
use aws_smithy_types::Blob;
use dynamodb::types::{
AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity, ReturnItemCollectionMetrics,
TableStatus, WriteRequest,
};
use maplit::hashmap;
use dynamodb::types::{AttributeValue, TableStatus, WriteRequest};
use futures::prelude::future::TryFutureExt;
use futures::prelude::TryFuture;
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row as _;
Expand All @@ -31,6 +29,7 @@ use risingwave_common::util::iter_util::ZipEqDebug;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;
use write_chunk_future::{DynamoDbPayloadWriter, WriteChunkFuture};

use super::log_store::DeliveryFutureManagerAddFuture;
use super::writer::{
Expand All @@ -50,10 +49,33 @@ pub struct DynamoDbConfig {

#[serde(rename = "dynamodb.max_batch_rows", default = "default_max_batch_rows")]
#[serde_as(as = "DisplayFromStr")]
#[deprecated]
pub max_batch_rows: usize,

#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,

#[serde(
rename = "dynamodb.max_batch_item_nums",
default = "default_max_batch_item_nums"
)]
#[serde_as(as = "DisplayFromStr")]
pub max_batch_item_nums: usize,

#[serde(
rename = "dynamodb.max_future_send_nums",
default = "default_max_future_send_nums"
)]
#[serde_as(as = "DisplayFromStr")]
pub max_future_send_nums: usize,
}

fn default_max_batch_item_nums() -> usize {
25
}

fn default_max_future_send_nums() -> usize {
256
}

fn default_max_batch_rows() -> usize {
Expand Down Expand Up @@ -141,7 +163,7 @@ impl Sink for DynamoDbSink {
Ok(
DynamoDbSinkWriter::new(self.config.clone(), self.schema.clone())
.await?
.into_log_sinker(usize::MAX),
.into_log_sinker(self.config.max_future_send_nums),
)
}
}
Expand Down Expand Up @@ -183,77 +205,7 @@ impl DynamoDbRequest {
}
}

struct DynamoDbPayloadWriter {
request_items: Vec<DynamoDbRequest>,
client: Client,
table: String,
dynamodb_keys: Vec<String>,
}

impl DynamoDbPayloadWriter {
fn write_one_insert(&mut self, item: HashMap<String, AttributeValue>) {
let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
let req = WriteRequest::builder().put_request(put_req).build();
self.write_one_req(req);
}

fn write_one_delete(&mut self, key: HashMap<String, AttributeValue>) {
let key = key
.into_iter()
.filter(|(k, _)| self.dynamodb_keys.contains(k))
.collect();
let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
let req = WriteRequest::builder().delete_request(del_req).build();
self.write_one_req(req);
}

fn write_one_req(&mut self, req: WriteRequest) {
let r_req = DynamoDbRequest {
inner: req,
key_items: self.dynamodb_keys.clone(),
};
if let Some(v) = r_req.extract_pk_values() {
self.request_items.retain(|item| {
!item
.extract_pk_values()
.unwrap_or_default()
.iter()
.all(|x| v.contains(x))
});
}
self.request_items.push(r_req);
}

async fn write_chunk(&mut self) -> Result<()> {
if !self.request_items.is_empty() {
let table = self.table.clone();
let req_items = std::mem::take(&mut self.request_items)
.into_iter()
.map(|r| r.inner)
.collect();
let reqs = hashmap! {
table => req_items,
};
self.client
.batch_write_item()
.set_request_items(Some(reqs))
.return_consumed_capacity(ReturnConsumedCapacity::None)
.return_item_collection_metrics(ReturnItemCollectionMetrics::None)
.send()
.await
.map_err(|e| {
SinkError::DynamoDb(
anyhow!(e).context("failed to delete item from DynamoDB sink"),
)
})?;
}

Ok(())
}
}

pub struct DynamoDbSinkWriter {
max_batch_rows: usize,
payload_writer: DynamoDbPayloadWriter,
formatter: DynamoDbFormatter,
}
Expand Down Expand Up @@ -282,56 +234,52 @@ impl DynamoDbSinkWriter {
.collect();

let payload_writer = DynamoDbPayloadWriter {
request_items: Vec::new(),
client,
table: config.table,
table: config.table.clone(),
dynamodb_keys,
max_batch_item_nums: config.max_batch_item_nums,
};

Ok(Self {
max_batch_rows: config.max_batch_rows,
payload_writer,
formatter: DynamoDbFormatter { schema },
})
}

async fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<()> {
fn write_chunk_inner(&mut self, chunk: StreamChunk) -> Result<WriteChunkFuture> {
let mut request_items = Vec::new();
for (op, row) in chunk.rows() {
let items = self.formatter.format_row(row)?;
match op {
Op::Insert | Op::UpdateInsert => {
self.payload_writer.write_one_insert(items);
self.payload_writer
.write_one_insert(items, &mut request_items);
}
Op::Delete => {
self.payload_writer.write_one_delete(items);
self.payload_writer
.write_one_delete(items, &mut request_items);
}
Op::UpdateDelete => {}
}
}
if self.payload_writer.request_items.len() >= self.max_batch_rows {
self.payload_writer.write_chunk().await?;
}
Ok(())
}

async fn flush(&mut self) -> Result<()> {
self.payload_writer.write_chunk().await
Ok(self.payload_writer.write_chunk(request_items))
}
}

pub type DynamoDbSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl AsyncTruncateSinkWriter for DynamoDbSinkWriter {
type DeliveryFuture = DynamoDbSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
self.write_chunk_inner(chunk).await
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if is_checkpoint {
self.flush().await?;
}
let futures = self.write_chunk_inner(chunk)?;
add_future
.add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -400,3 +348,123 @@ fn map_data(scalar_ref: Option<ScalarRefImpl<'_>>, data_type: &DataType) -> Resu
};
Ok(attr)
}

mod write_chunk_future {
use core::result;
use std::collections::HashMap;

use anyhow::anyhow;
use aws_sdk_dynamodb as dynamodb;
use aws_sdk_dynamodb::client::Client;
use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
use dynamodb::error::SdkError;
use dynamodb::operation::batch_write_item::{BatchWriteItemError, BatchWriteItemOutput};
use dynamodb::types::{
AttributeValue, DeleteRequest, PutRequest, ReturnConsumedCapacity,
ReturnItemCollectionMetrics, WriteRequest,
};
use futures::future::{Map, TryJoinAll};
use futures::prelude::future::{try_join_all, FutureExt};
use futures::prelude::Future;
use itertools::Itertools;
use maplit::hashmap;

use super::{DynamoDbRequest, Result, SinkError};

pub type WriteChunkFuture = TryJoinAll<
Map<
impl Future<
Output = result::Result<
BatchWriteItemOutput,
SdkError<BatchWriteItemError, HttpResponse>,
>,
>,
impl FnOnce(
result::Result<BatchWriteItemOutput, SdkError<BatchWriteItemError, HttpResponse>>,
) -> Result<()>,
>,
>;
pub struct DynamoDbPayloadWriter {
pub client: Client,
pub table: String,
pub dynamodb_keys: Vec<String>,
pub max_batch_item_nums: usize,
}

impl DynamoDbPayloadWriter {
pub fn write_one_insert(
&mut self,
item: HashMap<String, AttributeValue>,
request_items: &mut Vec<DynamoDbRequest>,
) {
let put_req = PutRequest::builder().set_item(Some(item)).build().unwrap();
let req = WriteRequest::builder().put_request(put_req).build();
self.write_one_req(req, request_items);
}

pub fn write_one_delete(
&mut self,
key: HashMap<String, AttributeValue>,
request_items: &mut Vec<DynamoDbRequest>,
) {
let key = key
.into_iter()
.filter(|(k, _)| self.dynamodb_keys.contains(k))
.collect();
let del_req = DeleteRequest::builder().set_key(Some(key)).build().unwrap();
let req = WriteRequest::builder().delete_request(del_req).build();
self.write_one_req(req, request_items);
}

pub fn write_one_req(
&mut self,
req: WriteRequest,
request_items: &mut Vec<DynamoDbRequest>,
) {
let r_req = DynamoDbRequest {
inner: req,
key_items: self.dynamodb_keys.clone(),
};
if let Some(v) = r_req.extract_pk_values() {
request_items.retain(|item| {
!item
.extract_pk_values()
.unwrap_or_default()
.iter()
.all(|x| v.contains(x))
});
}
request_items.push(r_req);
}

pub fn write_chunk(&mut self, request_items: Vec<DynamoDbRequest>) -> WriteChunkFuture {
let table = self.table.clone();
let chunks = request_items
.into_iter()
.map(|r| r.inner)
.chunks(self.max_batch_item_nums);
let futures = chunks.into_iter().map(|chunk| {
let req_items = chunk.collect();
let reqs = hashmap! {
table.clone() => req_items,
};
self.client
.batch_write_item()
.set_request_items(Some(reqs))
.return_consumed_capacity(ReturnConsumedCapacity::None)
.return_item_collection_metrics(ReturnItemCollectionMetrics::None)
.send()
.map(|result| {
result
.map_err(|e| {
SinkError::DynamoDb(
anyhow!(e).context("failed to delete item from DynamoDB sink"),
)
})
.map(|_| ())
})
});
try_join_all(futures)
}
}
}
Loading

0 comments on commit d877481

Please sign in to comment.