Skip to content

Commit

Permalink
chore: fix unstable unit test (risingwavelabs#8674)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Mar 23, 2023
1 parent d967fcc commit 24ecd4d
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 41 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ pub fn gen_sorted_data(
batch_num: usize,
start: String,
step: u64,
offset: u64,
) -> Vec<DataChunk> {
let mut data_gen = FieldGeneratorImpl::with_number_sequence(
DataType::Int64,
Some(start),
Some(i64::MAX.to_string()),
0,
step,
offset,
)
.unwrap();
let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
Expand Down
15 changes: 14 additions & 1 deletion src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ pub trait NumericFieldRandomGenerator {

/// fields that can be continuously generated impl this trait
pub trait NumericFieldSequenceGenerator {
fn new(start: Option<String>, end: Option<String>, offset: u64, step: u64) -> Result<Self>
fn new(
start: Option<String>,
end: Option<String>,
offset: u64,
step: u64,
event_offset: u64,
) -> Result<Self>
where
Self: Sized;

Expand Down Expand Up @@ -93,37 +99,43 @@ impl FieldGeneratorImpl {
end: Option<String>,
split_index: u64,
split_num: u64,
offset: u64,
) -> Result<Self> {
match data_type {
DataType::Int16 => Ok(FieldGeneratorImpl::I16Sequence(I16SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Int32 => Ok(FieldGeneratorImpl::I32Sequence(I32SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Int64 => Ok(FieldGeneratorImpl::I64Sequence(I64SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Float32 => Ok(FieldGeneratorImpl::F32Sequence(F32SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Float64 => Ok(FieldGeneratorImpl::F64Sequence(F64SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
_ => unimplemented!(),
}
Expand Down Expand Up @@ -265,6 +277,7 @@ mod tests {
Some("20".to_string()),
split_index,
split_num,
0,
)
.unwrap(),
);
Expand Down
14 changes: 9 additions & 5 deletions src/common/src/field_generator/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ where
end_option: Option<String>,
offset: u64,
step: u64,
event_offset: u64,
) -> Result<Self>
where
Self: Sized,
Expand All @@ -127,7 +128,9 @@ where
end,
offset,
step,
..Default::default()
cur: T::from(event_offset).ok_or_else(|| {
anyhow::anyhow!("event offset is too big, offset: {}", event_offset,)
})?,
})
}

Expand Down Expand Up @@ -194,7 +197,7 @@ mod tests {
#[test]
fn test_sequence_field_generator() {
let mut i16_field =
I16SequenceField::new(Some("5".to_string()), Some("10".to_string()), 0, 1).unwrap();
I16SequenceField::new(Some("5".to_string()), Some("10".to_string()), 0, 1, 0).unwrap();
for i in 5..=10 {
assert_eq!(i16_field.generate(), json!(i));
}
Expand Down Expand Up @@ -222,7 +225,8 @@ mod tests {
#[test]
fn test_sequence_datum_generator() {
let mut f32_field =
F32SequenceField::new(Some("5.0".to_string()), Some("10.0".to_string()), 0, 1).unwrap();
F32SequenceField::new(Some("5.0".to_string()), Some("10.0".to_string()), 0, 1, 0)
.unwrap();

for i in 5..=10 {
assert_eq!(
Expand All @@ -247,13 +251,13 @@ mod tests {
#[test]
fn test_sequence_field_generator_float() {
let mut f64_field =
F64SequenceField::new(Some("0".to_string()), Some("10".to_string()), 0, 1).unwrap();
F64SequenceField::new(Some("0".to_string()), Some("10".to_string()), 0, 1, 0).unwrap();
for i in 0..=10 {
assert_eq!(f64_field.generate(), json!(i as f64));
}

let mut f32_field =
F32SequenceField::new(Some("-5".to_string()), Some("5".to_string()), 0, 1).unwrap();
F32SequenceField::new(Some("-5".to_string()), Some("5".to_string()), 0, 1, 0).unwrap();
for i in -5..=5 {
assert_eq!(f32_field.generate(), json!(i as f32));
}
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ mod tests {
Some(end.to_string()),
split_index,
split_num,
0,
)
.unwrap(),
),
Expand All @@ -251,6 +252,7 @@ mod tests {
Some(end.to_string()),
split_index,
split_num,
0,
)
.unwrap(),
),
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl SplitReader for DatagenSplitReader {
let mut events_so_far = u64::default();
tracing::debug!("Splits for datagen found! {:?}", splits);

assert!(splits.len() == 1);
debug_assert!(splits.len() == 1);
let split = splits.into_iter().next().unwrap();
// TODO: currently, assume there's only on split in one reader
let split_id = split.id();
Expand Down Expand Up @@ -114,6 +114,7 @@ impl SplitReader for DatagenSplitReader {
&column.name,
split_index,
split_num,
events_so_far,
)?)
} else {
FieldDesc::Invisible
Expand Down Expand Up @@ -172,6 +173,7 @@ fn generator_from_data_type(
name: &String,
split_index: u64,
split_num: u64,
offset: u64,
) -> Result<FieldGeneratorImpl> {
let random_seed_key = format!("fields.{}.seed", name);
let random_seed: u64 = match fields_option_map
Expand Down Expand Up @@ -236,6 +238,7 @@ fn generator_from_data_type(
&format!("{}.{}", name, field_name),
split_index,
split_num,
offset,
)?;
Ok((field_name, gen))
})
Expand All @@ -251,6 +254,7 @@ fn generator_from_data_type(
&format!("{}._", name),
split_index,
split_num,
offset,
)?;
FieldGeneratorImpl::with_list(generator, length_value)
}
Expand All @@ -267,7 +271,8 @@ fn generator_from_data_type(
start_value,
end_value,
split_index,
split_num
split_num,
offset,
)
} else {
let min_key = format!("fields.{}.min", name);
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] }
tracing-test = "0.2"
63 changes: 30 additions & 33 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,21 @@ impl<S: StateStore> Debug for SourceExecutor<S> {

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;

use std::time::Duration;

use maplit::{convert_args, hashmap};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::datagen::DatagenSplit;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_source::connector_test_utils::create_source_desc_builder;
use risingwave_storage::memory::MemoryStateStore;
use tokio::sync::mpsc::unbounded_channel;
use tracing_test::traced_test;

use super::*;
use crate::executor::ActorContext;
Expand Down Expand Up @@ -600,6 +600,7 @@ mod tests {
);
}

#[traced_test]
#[tokio::test]
async fn test_split_change_mutation() {
let table_id = TableId::default();
Expand All @@ -615,9 +616,9 @@ mod tests {
};
let properties = convert_args!(hashmap!(
"connector" => "datagen",
"fields.v1.min" => "1",
"fields.v1.max" => "1000",
"fields.v1.seed" => "12345",
"fields.v1.kind" => "sequence",
"fields.v1.start" => "11",
"fields.v1.end" => "11111",
));

let source_desc_builder = create_source_desc_builder(
Expand Down Expand Up @@ -658,20 +659,7 @@ mod tests {
u64::MAX,
1,
);

let mut materialize = MaterializeExecutor::for_test(
Box::new(executor),
mem_state_store.clone(),
TableId::from(0x2333),
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids,
2,
Arc::new(AtomicU64::new(0)),
ConflictBehavior::NoCheck,
)
.await
.boxed()
.execute();
let mut handler = Box::new(executor).execute();

let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add {
adds: HashMap::new(),
Expand All @@ -687,11 +675,11 @@ mod tests {
});
barrier_tx.send(init_barrier).unwrap();

(materialize.next().await.unwrap().unwrap())
(handler.next().await.unwrap().unwrap())
.into_barrier()
.unwrap();

let mut ready_chunks = materialize.ready_chunks(10);
let mut ready_chunks = handler.ready_chunks(10);
let chunks = (ready_chunks.next().await.unwrap())
.into_iter()
.map(|msg| msg.unwrap().into_chunk().unwrap())
Expand All @@ -701,10 +689,10 @@ mod tests {
chunk_1,
StreamChunk::from_pretty(
" i
+ 533
+ 833
+ 738
+ 344",
+ 11
+ 14
+ 17
+ 20",
)
);

Expand All @@ -719,6 +707,11 @@ mod tests {
split_num: 3,
start_offset: None,
}),
SplitImpl::Datagen(DatagenSplit {
split_index: 2,
split_num: 3,
start_offset: None,
}),
];

let change_split_mutation =
Expand Down Expand Up @@ -751,18 +744,22 @@ mod tests {
let chunk_2 = StreamChunk::concat(chunks).sort_rows();
assert_eq!(
chunk_2,
// mixed from datagen split 0 and 1
// mixed from datagen split 0, 1 and 2
StreamChunk::from_pretty(
" i
+ 12
+ 13
+ 15
+ 16
+ 18
+ 19
+ 23
+ 26
+ 29
+ 201
+ 344
+ 425
+ 525
+ 533
+ 833",
+ 32",
)
);
tracing::debug!("chunk_2: {:?}", chunk_2.to_pretty_string());

let barrier = Barrier::new_test_barrier(3).with_mutation(Mutation::Pause);
barrier_tx.send(barrier).unwrap();
Expand Down

0 comments on commit 24ecd4d

Please sign in to comment.