Skip to content

Commit

Permalink
feat: try to enable recovery test in ci (#6347)
Browse files Browse the repository at this point in the history
* feat: try to enable recovery test in ci

* fix

* fix init notification

* run simulation in release

* some fix and temporarily disable tpch test in recovery

* fix sink recovery in frontend

* config kill-rate

* fix env

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and xxchan committed Nov 19, 2022
1 parent 503cc9f commit 9242b2b
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 81 deletions.
6 changes: 2 additions & 4 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
seq 1 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill-meta --kill-frontend --kill-compute --kill-compactor --kill-rate=0.5 ./e2e_test/streaming/\*\*/\*.slt > $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'
seq 1 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill-meta --kill-frontend --kill-compute --kill-compactor --kill-rate=${KILL_RATE} ./e2e_test/streaming/\*\*/\*.slt > $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch"
seq 1 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill-meta --kill-frontend --kill-compute --kill-compactor --kill-rate=0.5 ./e2e_test/batch/\*\*/\*.slt > $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'
seq 1 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill-meta --kill-frontend --kill-compute --kill-compactor --kill-rate=${KILL_RATE} ./e2e_test/batch/\*\*/\*.slt > $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'
3 changes: 1 addition & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ steps:
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "timeout 10m ci/scripts/deterministic-recovery-test.sh"
command: "KILL_RATE=1.0 timeout 14m ci/scripts/deterministic-recovery-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
Expand All @@ -149,7 +149,6 @@ steps:
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry
soft_fail: true

- label: "misc check"
command: "ci/scripts/misc-check.sh"
Expand Down
3 changes: 1 addition & 2 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ steps:
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "timeout 10m ci/scripts/deterministic-recovery-test.sh"
command: "KILL_RATE=0.5 timeout 14m ci/scripts/deterministic-recovery-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
Expand All @@ -216,7 +216,6 @@ steps:
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry
soft_fail: true

- label: "misc check"
command: "ci/scripts/misc-check.sh"
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ impl ObserverState for FrontendObserverNode {
for index in snapshot.indexes {
catalog_guard.create_index(&index)
}
for sink in snapshot.sinks {
catalog_guard.create_sink(&sink)
}
for view in snapshot.views {
catalog_guard.create_view(&view)
}
Expand Down
40 changes: 17 additions & 23 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,24 @@ pub struct FragmentManagerCore {
}

impl FragmentManagerCore {
/// List all fragment vnode mapping info.
pub fn all_fragment_mappings(&self) -> impl Iterator<Item = ParallelUnitMapping> + '_ {
self.table_fragments.values().flat_map(|table_fragments| {
table_fragments.fragments.values().map(|fragment| {
let parallel_unit_mapping = fragment
.vnode_mapping
.as_ref()
.expect("no data distribution found");
ParallelUnitMapping {
fragment_id: fragment.fragment_id,
original_indices: parallel_unit_mapping.original_indices.clone(),
data: parallel_unit_mapping.data.clone(),
}
/// List all fragment vnode mapping info that not in `State::Initial`.
pub fn all_running_fragment_mappings(&self) -> impl Iterator<Item = ParallelUnitMapping> + '_ {
self.table_fragments
.values()
.filter(|tf| tf.state() != State::Initial)
.flat_map(|table_fragments| {
table_fragments.fragments.values().map(|fragment| {
let parallel_unit_mapping = fragment
.vnode_mapping
.as_ref()
.expect("no data distribution found");
ParallelUnitMapping {
fragment_id: fragment.fragment_id,
original_indices: parallel_unit_mapping.original_indices.clone(),
data: parallel_unit_mapping.data.clone(),
}
})
})
})
}

pub fn all_internal_tables(&self) -> impl Iterator<Item = &u32> + '_ {
self.table_fragments.values().flat_map(|table_fragments| {
table_fragments
.fragments
.values()
.flat_map(|fragment| fragment.state_table_ids.iter())
})
}
}

Expand Down
17 changes: 1 addition & 16 deletions src/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use itertools::Itertools;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::worker_node::State::Running;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::notification_service_server::NotificationService;
Expand Down Expand Up @@ -89,8 +86,7 @@ where
let users = catalog_guard.user.list_users();

let fragment_guard = self.fragment_manager.get_fragment_read_guard().await;
let parallel_unit_mappings = fragment_guard.all_fragment_mappings().collect_vec();
let all_internal_tables = fragment_guard.all_internal_tables();
let parallel_unit_mappings = fragment_guard.all_running_fragment_mappings().collect_vec();
let hummock_snapshot = Some(self.hummock_manager.get_last_epoch().unwrap());

// We should only pin for workers to which we send a `meta_snapshot` that includes
Expand All @@ -109,17 +105,6 @@ where
match subscribe_type {
SubscribeType::Compactor | SubscribeType::Hummock => {
tables.extend(creating_tables);
let all_table_set: HashSet<u32> = tables.iter().map(|table| table.id).collect();
// FIXME: since `SourceExecutor` doesn't have catalog yet, this is a workaround to
// sync internal tables of source.
for table_id in all_internal_tables {
if !all_table_set.contains(table_id) {
tables.extend(std::iter::once(Table {
id: *table_id,
..Default::default()
}));
}
}
}
_ => {}
}
Expand Down
58 changes: 29 additions & 29 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,36 +172,36 @@ where
}
};

let discovered_splits = handle.discovered_splits().await.unwrap();

for fragment_id in fragment_ids {
let actor_ids = match self
.fragment_manager
.get_running_actors_of_fragment(*fragment_id)
.await
{
Ok(actor_ids) => actor_ids,
Err(err) => {
tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string());
continue;
if let Some(discovered_splits) = handle.discovered_splits().await {
for fragment_id in fragment_ids {
let actor_ids = match self
.fragment_manager
.get_running_actors_of_fragment(*fragment_id)
.await
{
Ok(actor_ids) => actor_ids,
Err(err) => {
tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string());
continue;
}
};

let prev_actor_splits: HashMap<_, _> = actor_ids
.into_iter()
.map(|actor_id| {
(
actor_id,
self.actor_splits
.get(&actor_id)
.cloned()
.unwrap_or_default(),
)
})
.collect();

if let Some(change) = diff_splits(prev_actor_splits, &discovered_splits) {
split_assignment.insert(*fragment_id, change);
}
};

let prev_actor_splits: HashMap<_, _> = actor_ids
.into_iter()
.map(|actor_id| {
(
actor_id,
self.actor_splits
.get(&actor_id)
.cloned()
.unwrap_or_default(),
)
})
.collect();

if let Some(change) = diff_splits(prev_actor_splits, &discovered_splits) {
split_assignment.insert(*fragment_id, change);
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ where
};
}

tracing::error!(actor_id = id, "actor exit without stop barrier");

Ok(())
}
}
11 changes: 8 additions & 3 deletions src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async fn main() {
})
.build();
// wait for the service to be ready
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
tokio::time::sleep(std::time::Duration::from_secs(15)).await;

// frontend node
let mut frontend_ip = vec![];
Expand Down Expand Up @@ -326,7 +326,7 @@ async fn main() {
});

// wait for the service to be ready
tokio::time::sleep(Duration::from_secs(30)).await;
tokio::time::sleep(Duration::from_secs(15)).await;
// client
let client_node = handle
.create_node()
Expand Down Expand Up @@ -416,6 +416,11 @@ async fn run_slt_task(glob: &str, host: &str) {
let file = file.unwrap();
let path = file.as_path();
println!("{}", path.display());
if kill && (path.ends_with("tpch_snapshot.slt") || path.ends_with("tpch_upstream.slt")) {
// Simply ignore the tpch test cases when enable kill nodes.
continue;
}

// XXX: hack for kafka source test
let tempfile = path.ends_with("kafka.slt").then(|| hack_kafka_test(path));
let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
Expand Down Expand Up @@ -556,7 +561,7 @@ impl Risingwave {
tracing::error!("postgres connection error: {e}");
}
});
if ARGS.kill_compute || ARGS.kill_meta {
if ARGS.kill_frontend {
client
.simple_query("SET RW_IMPLICIT_FLUSH TO true;")
.await?;
Expand Down

0 comments on commit 9242b2b

Please sign in to comment.