Skip to content

Commit

Permalink
fix(test): fix the deterministic kafka source test (#7733)
Browse files Browse the repository at this point in the history
- Correct the path of `kafka.slt`.
- Ignore files with invalid name to fix #7633.

Approved-By: kwannoel
Approved-By: tabVersion
  • Loading branch information
wangrunji0408 authored Feb 8, 2023
1 parent b6fe18e commit e5004c3
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 6 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, batch"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, kafka source"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/kafka.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'
Expand Down
6 changes: 4 additions & 2 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl Cluster {
}

/// Create a node for kafka producer and prepare data.
pub fn create_kafka_producer(&self, datadir: &str) {
pub async fn create_kafka_producer(&self, datadir: &str) {
self.handle
.create_node()
.name("kafka-producer")
Expand All @@ -439,7 +439,9 @@ impl Cluster {
.spawn(crate::kafka::producer(
"192.168.11.1:29092",
datadir.to_string(),
));
))
.await
.unwrap();
}

/// Create a kafka topic.
Expand Down
5 changes: 4 additions & 1 deletion src/tests/simulation/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ pub async fn producer(broker_addr: &str, datadir: String) {
for file in std::fs::read_dir(datadir).unwrap() {
let file = file.unwrap();
let name = file.file_name().into_string().unwrap();
let (topic, partitions) = name.split_once('.').unwrap();
let Some((topic, partitions)) = name.split_once('.') else {
tracing::warn!("ignore file: {name:?}. expected format \"topic.partitions\"");
continue;
};
admin
.create_topics(
&[NewTopic::new(
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn main() {
);

if let Some(datadir) = args.kafka_datadir {
cluster.create_kafka_producer(&datadir);
cluster.create_kafka_producer(&datadir).await;
}

if let Some(count) = args.sqlsmith {
Expand Down
3 changes: 2 additions & 1 deletion src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
continue;
}
// XXX: hack for kafka source test
let tempfile = path.ends_with("kafka.slt").then(|| hack_kafka_test(path));
let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
.then(|| hack_kafka_test(path));
let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
for record in sqllogictest::parse_file(path).expect("failed to parse file") {
if let sqllogictest::Record::Halt { .. } = record {
Expand Down

0 comments on commit e5004c3

Please sign in to comment.