Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(frontend): rename streaming_rate_limit to backfill_rate_limit for mv backfilling and source_rate_limit for source updates #17796

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
create table t (v1 int);

statement ok
SET STREAMING_RATE_LIMIT = 500;
SET BACKFILL_RATE_LIMIT = 500;

# Should finish in 20s
statement ok
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/backfill/sink/different_pk_and_dist_key.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ statement ok
create materialized view m1 as select t.v1, t.v2, t.v3 from t join t2 using(v1);

statement ok
set streaming_rate_limit = 1;
set backfill_rate_limit = 1;

statement ok
set background_ddl = true;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=10000;
SET BACKFILL_RATE_LIMIT=10000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/background_ddl/sim/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ statement ok
FLUSH;

statement ok
SET STREAMING_RATE_LIMIT=4000;
SET BACKFILL_RATE_LIMIT=4000;

statement ok
CREATE MATERIALIZED VIEW m1 as SELECT * FROM t;
Expand Down
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ postmaster license_key
postmaster max_concurrent_creating_streaming_jobs
postmaster pause_on_next_bootstrap
user application_name
user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/ddl/drop/drop_creating_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ statement ok
flush;

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

############## Test drop foreground mv
onlyif can-use-recover
Expand Down Expand Up @@ -61,7 +61,7 @@ drop materialized view m1;

############## Make sure the mv can still be successfully created later.
statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
set background_ddl=false;
Expand Down
14 changes: 8 additions & 6 deletions e2e_test/ddl/throttle.slt
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# streaming_rate_limit also applies to create sink and create source, please refer to
# e2e_test/source/basic/kafka.slt and e2e_test/sink/kafka/create_sink.slt for this part
# streaming_rate_limit applies to create source, please refer to
# e2e_test/source/basic/kafka.slt.
# backfill_rate_limit applies to create sink, please refer to
# e2e_test/sink/kafka/create_sink.slt.

statement ok
create table t1 (v1 int);

# tracked in https://github.com/risingwavelabs/risingwave/issues/13474
# create with duplicate streaming_rate_limit
# create with duplicate backfill_rate_limit
statement error Duplicated option
create materialized view mv1 with (streaming_rate_limit = 1000, streaming_rate_limit = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, backfill_rate_limit = 2000) as select * from t1;

# create with unknown fields
statement error unexpected options in WITH clause
create materialized view mv1 with (streaming_rate_limit = 1000, unknown_field = 2000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000, unknown_field = 2000) as select * from t1;

statement ok
create materialized view mv1 with (streaming_rate_limit = 1000) as select * from t1;
create materialized view mv1 with (backfill_rate_limit = 1000) as select * from t1;

statement ok
drop materialized view mv1;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ create sink multiple_pk_throttle from t_kafka with (
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand All @@ -165,7 +165,7 @@ create sink multiple_pk_throttle_1
topic = 'test-rw-sink-debezium',
type = 'debezium',
primary_key = 'id,v_varchar',
streaming_rate_limit = 200
backfill_rate_limit = 200
);

statement ok
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/backfill/rate_limit/slow-udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
insert into t select 2 from generate_series(1, 1000000);

statement ok
set streaming_rate_limit=1;
set backfill_rate_limit=1;

statement ok
set background_ddl=true;
Expand All @@ -25,7 +25,7 @@ statement ok
set background_ddl = false;

statement ok
set streaming_rate_limit=default;
set backfill_rate_limit=default;

statement ok
flush;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/slow_tests/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ statement ok
flush;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
SET BACKGROUND_DDL=true;
Expand Down Expand Up @@ -57,7 +57,7 @@ SELECT count(*) FROM mv_always_retry where s1 is NULL;
# t

statement ok
SET STREAMING_RATE_LIMIT TO DEFAULT;
SET BACKFILL_RATE_LIMIT TO DEFAULT;

statement ok
SET BACKGROUND_DDL=false;
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/streaming/rate_limit/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
SET STREAMING_RATE_LIMIT TO 2000;
SET BACKFILL_RATE_LIMIT TO 2000;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down Expand Up @@ -35,7 +35,7 @@ FLUSH;
# Test with small rate_limit. In this case, the stream chunk needs to be split

statement ok
SET STREAMING_RATE_LIMIT TO 1;
SET BACKFILL_RATE_LIMIT TO 1;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/snapshot_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE table (i1 int);
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit/upstream_amplification.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ statement ok
SET STREAMING_PARALLELISM=2;

statement ok
SET STREAMING_RATE_LIMIT=1;
SET BACKFILL_RATE_LIMIT=1;

statement ok
CREATE TABLE source_table (i1 int)
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type SessionConfigResult<T> = std::result::Result<T, SessionConfigError>;

// NOTE(kwannoel): We declare it separately as a constant,
// otherwise seems like it can't infer the type of -1 when written inline.
const DISABLE_BACKFILL_RATE_LIMIT: i32 = -1;
const DISABLE_STREAMING_RATE_LIMIT: i32 = -1;

#[serde_as]
Expand Down Expand Up @@ -253,6 +254,12 @@ pub struct SessionConfig {
#[parameter(default = STANDARD_CONFORMING_STRINGS)]
standard_conforming_strings: String,

/// Set streaming rate limit (rows per second) for each parallelism for mv / source / sink backfilling
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
#[parameter(default = DISABLE_BACKFILL_RATE_LIMIT)]
backfill_rate_limit: i32,

/// Set streaming rate limit (rows per second) for each parallelism for mv / source backfilling, source reads.
/// If set to -1, disable rate limit.
/// If set to 0, this pauses the snapshot read / source read.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ pub async fn handle(
Statement::AlterView {
materialized,
name,
operation: AlterViewOperation::SetStreamingRateLimit { rate_limit },
operation: AlterViewOperation::SetBackfillRateLimit { rate_limit },
} if materialized => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl StreamCdcTableScan {
// The table desc used by backfill executor
state_table: Some(catalog),
cdc_table_desc: Some(self.core.cdc_table_desc.to_protobuf()),
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
disable_backfill: options.disable_backfill,
options: Some(options),
});
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl StreamSourceScan {
.map(|c| c.to_protobuf())
.collect_vec(),
with_properties,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
secret_refs,
};

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl StreamTableScan {
table_desc: Some(self.core.table_desc.try_to_protobuf()?),
state_table: Some(catalog),
arrangement_table,
rate_limit: self.base.ctx().overwrite_options().streaming_rate_limit,
rate_limit: self.base.ctx().overwrite_options().backfill_rate_limit,
..Default::default()
});

Expand Down
17 changes: 16 additions & 1 deletion src/frontend/src/utils/overwrite_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use crate::handler::HandlerArgs;
#[derive(Debug, Clone, Default)]
pub struct OverwriteOptions {
pub streaming_rate_limit: Option<u32>,
pub backfill_rate_limit: Option<u32>,
}

impl OverwriteOptions {
pub(crate) const BACKFILL_RATE_LIMIT_KEY: &'static str = "backfill_rate_limit";
pub(crate) const STREAMING_RATE_LIMIT_KEY: &'static str = "streaming_rate_limit";

pub fn new(args: &mut HandlerArgs) -> Self {
let streaming_rate_limit = {
// CREATE MATERIALIZED VIEW m1 WITH (rate_limit = N) ...
if let Some(x) = args.with_options.remove(Self::STREAMING_RATE_LIMIT_KEY) {
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
Expand All @@ -37,8 +38,22 @@ impl OverwriteOptions {
}
}
};
let backfill_rate_limit = {
if let Some(x) = args.with_options.remove(Self::BACKFILL_RATE_LIMIT_KEY) {
// FIXME(tabVersion): validate the value
Some(x.parse::<u32>().unwrap())
} else {
let rate_limit = args.session.config().backfill_rate_limit();
if rate_limit < 0 {
None
} else {
Some(rate_limit as u32)
}
}
};
Self {
streaming_rate_limit,
backfill_rate_limit,
}
}
}
8 changes: 4 additions & 4 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ pub enum AlterViewOperation {
parallelism: SetVariableValue,
deferred: bool,
},
/// `SET STREAMING_RATE_LIMIT TO <rate_limit>`
SetStreamingRateLimit {
/// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
SetBackfillRateLimit {
rate_limit: i32,
},
}
Expand Down Expand Up @@ -341,8 +341,8 @@ impl fmt::Display for AlterViewOperation {
if *deferred { " DEFERRED" } else { "" }
)
}
AlterViewOperation::SetStreamingRateLimit { rate_limit } => {
write!(f, "SET STREAMING_RATE_LIMIT TO {}", rate_limit)
AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ define_keywords!(
AUTHORIZATION,
AUTO,
AVG,
BACKFILL_RATE_LIMIT,
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
BASE64,
BEGIN,
BEGIN_FRAME,
Expand Down
28 changes: 25 additions & 3 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3133,6 +3133,28 @@ impl Parser<'_> {
})
}

/// BACKFILL_RATE_LIMIT = default | NUMBER
/// BACKFILL_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_backfill_rate_limit(&mut self) -> PResult<Option<i32>> {
if !self.parse_keyword(Keyword::BACKFILL_RATE_LIMIT) {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
return Ok(None);
}
if self.expect_keyword(Keyword::TO).is_err() && self.expect_token(&Token::Eq).is_err() {
return self.expected("TO or = after ALTER TABLE SET BACKFILL_RATE_LIMIT");
}
let rate_limit = if self.parse_keyword(Keyword::DEFAULT) {
-1
} else {
let s = self.parse_number_value()?;
if let Ok(n) = s.parse::<i32>() {
n
} else {
return self.expected("number or DEFAULT");
}
};
Ok(Some(rate_limit))
}

/// STREAMING_RATE_LIMIT = default | NUMBER
/// STREAMING_RATE_LIMIT TO default | NUMBER
pub fn parse_alter_streaming_rate_limit(&mut self) -> PResult<Option<i32>> {
Expand Down Expand Up @@ -3229,11 +3251,11 @@ impl Parser<'_> {
deferred,
}
} else if materialized
&& let Some(rate_limit) = self.parse_alter_streaming_rate_limit()?
&& let Some(rate_limit) = self.parse_alter_backfill_rate_limit()?
{
AlterViewOperation::SetStreamingRateLimit { rate_limit }
AlterViewOperation::SetBackfillRateLimit { rate_limit }
} else {
return self.expected("SCHEMA/PARALLELISM/STREAMING_RATE_LIMIT after SET");
return self.expected("SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET");
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
return self.expected(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async fn test_arrangement_backfill_replication() -> Result<()> {
session
.run("SET STREAMING_USE_ARRANGEMENT_BACKFILL=true")
.await?;
session.run("SET STREAMING_RATE_LIMIT=30").await?;
session.run("SET BACKFILL_RATE_LIMIT=30").await?;
session
.run("create materialized view m1 as select * from t")
.await?;
Expand Down Expand Up @@ -251,7 +251,7 @@ async fn test_arrangement_backfill_progress() -> Result<()> {
// Create arrangement backfill with rate limit
session.run("SET STREAMING_PARALLELISM=1").await?;
session.run("SET BACKGROUND_DDL=true").await?;
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session.run("SET BACKFILL_RATE_LIMIT=1").await?;
session
.run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t")
.await?;
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn test_enable_arrangement_backfill() -> Result<()> {
async fn test_recovery_cancels_foreground_ddl() -> Result<()> {
let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?;
let mut session = cluster.start_session();
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session.run("SET BACKFILL_RATE_LIMIT=1").await?;
session.run("CREATE TABLE t(v1 int);").await?;
session
.run("INSERT INTO t select * from generate_series(1, 100000);")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const DROP_TABLE: &str = "DROP TABLE t;";
const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);";
const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=DEFAULT;";
const SET_RATE_LIMIT_2: &str = "SET BACKFILL_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET BACKFILL_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET BACKFILL_RATE_LIMIT=DEFAULT;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;";
const WAIT: &str = "WAIT;";
Expand Down
Loading