Skip to content

Commit

Permalink
refactor(config): split streaming and batch developer config (risingw…
Browse files Browse the repository at this point in the history
…avelabs#8911)

Signed-off-by: Bugen Zhao <[email protected]>
BugenZhao authored Mar 31, 2023
1 parent ff1440f commit d687d71
Showing 38 changed files with 110 additions and 146 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
@@ -160,7 +160,7 @@ impl BoxedExecutorBuilder for DeleteExecutor {
delete_node.table_version_id,
source.context().dml_manager(),
child,
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
source.plan_node().get_identity().clone(),
delete_node.returning,
)))
2 changes: 1 addition & 1 deletion src/batch/src/executor/expand.rs
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ impl BoxedExecutorBuilder for ExpandExecutor {
Ok(Box::new(Self::new(
input,
column_subsets,
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
@@ -99,7 +99,7 @@ impl BoxedExecutorBuilder for FilterExecutor {
expr,
input,
source.plan_node().get_identity().clone(),
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
group_key_types,
with_ties: top_n_node.get_with_ties(),
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context.get_config().developer.batch_chunk_size,
chunk_size: source.context.get_config().developer.chunk_size,
};

Ok(builder.dispatch())
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ impl BoxedExecutorBuilder for HashAggExecutorBuilder {
child,
source.task_id.clone(),
identity,
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
@@ -194,7 +194,7 @@ impl BoxedExecutorBuilder for InsertExecutor {
insert_node.table_version_id,
source.context().dml_manager(),
child,
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
source.plan_node().get_identity().clone(),
column_indices,
insert_node.row_id_index.as_ref().map(|index| *index as _),
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
@@ -167,7 +167,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {

let null_safe = distributed_lookup_join_node.get_null_safe().to_vec();

let chunk_size = source.context.get_config().developer.batch_chunk_size;
let chunk_size = source.context.get_config().developer.chunk_size;

let table_id = TableId {
table_id: table_desc.table_id,
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
@@ -1694,7 +1694,7 @@ impl BoxedExecutorBuilder for HashJoinExecutor<()> {
cond,
identity: context.plan_node().get_identity().clone(),
right_key_types,
chunk_size: context.context.get_config().developer.batch_chunk_size,
chunk_size: context.context.get_config().developer.chunk_size,
}
.dispatch())
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
@@ -367,7 +367,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let vnode_mapping = lookup_join_node.get_inner_side_vnode_mapping().to_vec();
assert!(!vnode_mapping.is_empty());

let chunk_size = source.context.get_config().developer.batch_chunk_size;
let chunk_size = source.context.get_config().developer.chunk_size;

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
@@ -163,7 +163,7 @@ impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
left_child,
right_child,
source.plan_node().get_identity().clone(),
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
@@ -221,7 +221,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
schema: Schema { fields },
task_id: source.task_id.clone(),
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context.get_config().developer.batch_chunk_size,
chunk_size: source.context.get_config().developer.chunk_size,
}))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ impl BoxedExecutorBuilder for SortExecutor {
child,
column_orders,
source.plan_node().get_identity().clone(),
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)))
}
}
4 changes: 2 additions & 2 deletions src/batch/src/executor/project_set.rs
Original file line number Diff line number Diff line change
@@ -160,7 +160,7 @@ impl BoxedExecutorBuilder for ProjectSetExecutor {
.map(|proto| {
ProjectSetSelectItem::from_prost(
proto,
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)
})
.try_collect()?;
@@ -177,7 +177,7 @@ impl BoxedExecutorBuilder for ProjectSetExecutor {
child,
schema: Schema { fields },
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context.get_config().developer.batch_chunk_size,
chunk_size: source.context.get_config().developer.chunk_size,
}))
}
}
4 changes: 2 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
@@ -248,9 +248,9 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let chunk_size = if let Some(chunk_size_) = &seq_scan_node.chunk_size {
chunk_size_
.get_chunk_size()
.min(source.context.get_config().developer.batch_chunk_size as u32)
.min(source.context.get_config().developer.chunk_size as u32)
} else {
source.context.get_config().developer.batch_chunk_size as u32
source.context.get_config().developer.chunk_size as u32
};
let metrics = source.context().task_metrics();

2 changes: 1 addition & 1 deletion src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ impl BoxedExecutorBuilder for SortAggExecutor {
child,
schema: Schema { fields },
identity: source.plan_node().get_identity().clone(),
output_size_limit: source.context.get_config().developer.batch_chunk_size,
output_size_limit: source.context.get_config().developer.chunk_size,
}))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
@@ -103,7 +103,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
.context()
.get_config()
.developer
.stream_connector_message_buffer_size,
.connector_message_buffer_size,
};

let column_ids: Vec<_> = source_node
2 changes: 1 addition & 1 deletion src/batch/src/executor/table_function.rs
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ impl BoxedExecutorBuilder for TableFunctionExecutorBuilder {

let identity = source.plan_node().get_identity().clone();

let chunk_size = source.context.get_config().developer.batch_chunk_size;
let chunk_size = source.context.get_config().developer.chunk_size;

let table_function = build_from_prost(node.table_function.as_ref().unwrap(), chunk_size)?;

2 changes: 1 addition & 1 deletion src/batch/src/executor/top_n.rs
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ impl BoxedExecutorBuilder for TopNExecutor {
top_n_node.get_limit() as usize,
top_n_node.get_with_ties(),
source.plan_node().get_identity().clone(),
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
)))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
@@ -209,7 +209,7 @@ impl BoxedExecutorBuilder for UpdateExecutor {
source.context().dml_manager(),
child,
exprs,
source.context.get_config().developer.batch_chunk_size,
source.context.get_config().developer.chunk_size,
source.plan_node().get_identity().clone(),
update_node.returning,
)))
2 changes: 1 addition & 1 deletion src/batch/src/executor/values.rs
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ impl BoxedExecutorBuilder for ValuesExecutor {
rows: rows.into_iter(),
schema: Schema { fields },
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context.get_config().developer.batch_chunk_size,
chunk_size: source.context.get_config().developer.chunk_size,
}))
}
}
2 changes: 1 addition & 1 deletion src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
@@ -308,7 +308,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

let (sender, receivers) = create_output_channel(
plan.get_exchange_info()?,
context.get_config().developer.batch_output_channel_size,
context.get_config().developer.output_channel_size,
)?;

let mut rts = Vec::new();
2 changes: 2 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -61,7 +61,9 @@ risingwave_pb = { path = "../prost" }
rust_decimal = { version = "1", features = ["db-tokio-postgres"] }
ryu = "1.0"
serde = { version = "1", features = ["derive"] }
serde_default = "0.1"
serde_json = "1"
serde_with = "2"
static_assertions = "1"
strum = "0.24"
strum_macros = "0.24"
Loading

0 comments on commit d687d71

Please sign in to comment.