Skip to content

Commit

Permalink
fix: replay functions in snapshot on frontend (#8912)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Mar 31, 2023
1 parent dc5d3de commit 85ecc73
Showing 1 changed file with 34 additions and 14 deletions.
48 changes: 34 additions & 14 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common_service::observer_manager::{ObserverState, SubscribeFronte
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentParallelUnitMapping, SubscribeResponse};
use risingwave_pb::meta::{FragmentParallelUnitMapping, MetaSnapshot, SubscribeResponse};
use tokio::sync::watch::Sender;

use crate::catalog::root_catalog::Catalog;
Expand Down Expand Up @@ -93,35 +93,55 @@ impl ObserverState for FrontendObserverNode {
let Some(Info::Snapshot(snapshot)) = resp.info else {
unreachable!();
};
let MetaSnapshot {
databases,
schemas,
sources,
sinks,
tables,
indexes,
views,
functions,
users,
parallel_unit_mappings,
nodes,
hummock_snapshot,
hummock_version: _,
meta_backup_manifest_id: _,
hummock_write_limits: _,
version,
} = snapshot;

for db in snapshot.databases {
for db in databases {
catalog_guard.create_database(&db)
}
for schema in snapshot.schemas {
for schema in schemas {
catalog_guard.create_schema(&schema)
}
for table in snapshot.tables {
for table in tables {
catalog_guard.create_table(&table)
}
for source in snapshot.sources {
for source in sources {
catalog_guard.create_source(&source)
}
for user in snapshot.users {
for user in users {
user_guard.create_user(user)
}
for index in snapshot.indexes {
for index in indexes {
catalog_guard.create_index(&index)
}
for sink in snapshot.sinks {
for sink in sinks {
catalog_guard.create_sink(&sink)
}
for view in snapshot.views {
for view in views {
catalog_guard.create_view(&view)
}
for function in functions {
catalog_guard.create_function(&function)
}
self.worker_node_manager.refresh(
snapshot.nodes,
snapshot
.parallel_unit_mappings
nodes,
parallel_unit_mappings
.iter()
.map(
|FragmentParallelUnitMapping {
Expand All @@ -135,9 +155,9 @@ impl ObserverState for FrontendObserverNode {
.collect(),
);
self.hummock_snapshot_manager
.update_epoch(snapshot.hummock_snapshot.unwrap());
.update_epoch(hummock_snapshot.unwrap());

let snapshot_version = snapshot.version.unwrap();
let snapshot_version = version.unwrap();
catalog_guard.set_version(snapshot_version.catalog_version);
self.catalog_updated_tx
.send(snapshot_version.catalog_version)
Expand Down

0 comments on commit 85ecc73

Please sign in to comment.