Skip to content

Commit

Permalink
refactor(frontend): rename streaming_rate_limit to `backfill_rate_l…
Browse files Browse the repository at this point in the history
…imit` for mv backfilling and `source_rate_limit` for source updates (#17796)
  • Loading branch information
kwannoel authored Jul 30, 2024
1 parent 3fa13c7 commit cf30276
Show file tree
Hide file tree
Showing 31 changed files with 142 additions and 84 deletions.
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
create table t (v1 int);

statement ok
SET STREAMING_RATE_LIMIT = 500;
SET BACKFILL_RATE_LIMIT = 500;

# Should finish in 20s
statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;
set backfill_rate_limit = 1;

statement ok
set background_ddl = true;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=10000;
SET BACKFILL_RATE_LIMIT=10000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/sim/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=4000;
SET BACKFILL_RATE_LIMIT=4000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ postmaster license_key
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
user application_name
user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
Expand Down Expand Up @@ -52,10 +53,10 @@ user server_encoding
user server_version
user server_version_num
user sink_decouple
user source_rate_limit
user standard_conforming_strings
user statement_timeout
user streaming_parallelism
user streaming_rate_limit
user streaming_use_arrangement_backfill
user synchronize_seqscans
user timezone
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/ddl/drop/drop_creating_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ statement ok
flush;

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

############## Test drop foreground mv
onlyif can-use-recover
Expand Down Expand Up @@ -61,7 +61,7 @@ drop materialized view m1;

############## Make sure the mv can still be successfully created later.
statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
set background_ddl=false;
Expand Down
14 changes: 8 additions & 6 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# streaming_rate_limit also applies to create sink and create source, please refer to
# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part
# streaming_rate_limit applies to create source, please refer to
# e2e_test/source/basic/kafka.slt.
# backfill_rate_limit applies to create sink, please refer to
# e2e_test/sink/kafka/create_sink.slt.

statement ok
create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# create with duplicate backfill_rate_limit
statement error Duplicated option
create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, backfill_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, unknown_field = 2000) as select * from t1;

statement ok
create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000) as select * from t1;

statement ok
drop materialized view mv1;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ create sink multiple_pk_throttle from t_kafka with (
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand All @@ -165,7 +165,7 @@ create sink multiple_pk_throttle_1
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
insert into t select 2 from generate_series(1, 1000000);

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

statement ok
set background_ddl=true;
Expand All @@ -25,7 +25,7 @@ statement ok
set background_ddl = false;

statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
flush;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ statement ok
flush;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
SET BACKGROUND_DDL=true;
Expand Down Expand Up @@ -57,7 +57,7 @@ SELECT count(*) FROM mv_always_retry where s1 is NULL;
# t

statement ok
SET STREAMING_RATE_LIMIT TO DEFAULT;
SET BACKFILL_RATE_LIMIT TO DEFAULT;

statement ok
SET BACKGROUND_DDL=false;
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/source/basic/alter/rate_limit_source_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ sleep 3s
############## Create MV on source

statement ok
SET STREAMING_RATE_LIMIT=0;
SET SOURCE_RATE_LIMIT=0;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;
Expand All @@ -56,7 +56,7 @@ statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

statement ok
SET STREAMING_RATE_LIMIT=default;
SET SOURCE_RATE_LIMIT=default;

############## MVs should have 0 records, since source has (rate_limit = 0)

Expand All @@ -82,11 +82,11 @@ select * from rl_mv3;

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to 1000;
alter source kafka_source set source_rate_limit to 1000;

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to default;
alter source kafka_source set source_rate_limit to default;

skipif in-memory
sleep 3s
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/source/basic/alter/rate_limit_table_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ create table kafka_source (v1 int) with (
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 0
source_rate_limit = 0
) FORMAT PLAIN ENCODE JSON

statement ok
Expand Down Expand Up @@ -61,11 +61,11 @@ select count(*) from kafka_source;

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to 1000;
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to default;
alter table kafka_source set source_rate_limit to default;

skipif in-memory
sleep 3s
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ create table s29 (id bytea) with (
topic = 'kafka_source_format_bytes',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 200
source_rate_limit = 200
) FORMAT PLAIN ENCODE BYTES

statement ok
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/streaming/rate_limit/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
SET STREAMING_RATE_LIMIT TO 2000;
SET BACKFILL_RATE_LIMIT TO 2000;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down Expand Up @@ -35,7 +35,7 @@ FLUSH;
# Test with small rate_limit. In this case, the stream chunk needs to be split

statement ok
SET STREAMING_RATE_LIMIT TO 1;
SET BACKFILL_RATE_LIMIT TO 1;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/snapshot_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE table (i1 int);
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
Expand Down
13 changes: 10 additions & 3 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;

// NOTE(kwannoel): We declare it separately as a constant,
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_STREAMING_RATE_LIMIT: i32 = -1;
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_SOURCE_RATE_LIMIT: i32 = -1;

#[serde_as]
/// This is the Session Config of RisingWave.
Expand Down Expand Up @@ -253,11 +254,17 @@ pub struct SessionConfig {
#[parameter(default = STANDARD_CONFORMING_STRINGS)]
standard_conforming_strings: String,

/// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
backfill_rate_limit: i32,

/// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_STREAMING_RATE_LIMIT)]
streaming_rate_limit: i32,
#[parameter(default = DISABLE_SOURCE_RATE_LIMIT)]
source_rate_limit: i32,

/// Cache policy for partition cache in streaming over window.
/// Can be "full", "recent", "`recent_first_n`" or "`recent_last_n`".
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn handle_alter_streaming_rate_limit(
let source_id = if let Some(id) = table.associated_source_id {
id.table_id()
} else {
bail!("ALTER STREAMING_RATE_LIMIT is not for table without source")
bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_SOURCE, source_id)
}
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ pub async fn handle(
} => alter_table_with_sr::handle_refresh_schema(handler_args, name).await,
Statement::AlterTable {
name,
operation: AlterTableOperation::SetStreamingRateLimit { rate_limit },
operation: AlterTableOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down Expand Up @@ -814,7 +814,7 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetStreamingRateLimit { rate_limit },
operation: AlterViewOperation::SetBackfillRateLimit { rate_limit },
} if materialized => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down Expand Up @@ -946,7 +946,7 @@ pub async fn handle(
} => alter_source_with_sr::handler_refresh_schema(handler_args, name).await,
Statement::AlterSource {
name,
operation: AlterSourceOperation::SetStreamingRateLimit { rate_limit },
operation: AlterSourceOperation::SetSourceRateLimit { rate_limit },
} => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl StreamCdcTableScan {
// The table desc used by backfill executor
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
disable_backfill: options.disable_backfill,
options: Some(options),
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl StreamNode for StreamFsFetch {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
secret_refs,
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl StreamNode for StreamSource {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().source_rate_limit,
secret_refs,
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl StreamSourceScan {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
secret_refs,
};

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl StreamTableScan {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
state_table: Some(catalog),
arrangement_table,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
..Default::default()
});

Expand Down
Loading

0 comments on commit cf30276

Please sign in to comment.