From d557a6ce5c7e74ec97c12f80a16c1fc4b615f3ef Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 21 Mar 2023 13:05:15 +0800 Subject: [PATCH] fix(meta): fix alter table add/drop column with indexes (#8664) --- e2e_test/ddl/alter_table_column.slt | 32 ++++++++++++ src/frontend/src/catalog/root_catalog.rs | 8 +++ src/frontend/src/catalog/schema_catalog.rs | 28 +++++++++++ src/frontend/src/observer/observer_manager.rs | 1 + src/meta/src/manager/catalog/mod.rs | 49 ++++++++++++++++++- src/meta/src/rpc/ddl_controller.rs | 15 ++++-- 6 files changed, 128 insertions(+), 5 deletions(-) diff --git a/e2e_test/ddl/alter_table_column.slt b/e2e_test/ddl/alter_table_column.slt index 7cad432aa3197..fc142e1510191 100644 --- a/e2e_test/ddl/alter_table_column.slt +++ b/e2e_test/ddl/alter_table_column.slt @@ -209,3 +209,35 @@ drop materialized view mv; statement ok drop table t; + +# Test the consistency of tables and indexes #https://github.com/risingwavelabs/risingwave/issues/8649 +statement ok +create table t(id int primary key, a int, b varchar); + +statement ok +create index idx on t(a); + +statement ok +alter table t add column c int; + +query IITI rowsort +select * from t; +---- + +statement ok +drop table t; + +statement ok +create table t(id int primary key, a int, b varchar); + +statement ok +create index idx on t(b) include(b); + +statement ok +alter table t drop column a; + +query II rowsort +select * from t where b = '1'; + +statement ok +drop table t; \ No newline at end of file diff --git a/src/frontend/src/catalog/root_catalog.rs b/src/frontend/src/catalog/root_catalog.rs index 9302f23ac3f8b..d961554f5cc82 100644 --- a/src/frontend/src/catalog/root_catalog.rs +++ b/src/frontend/src/catalog/root_catalog.rs @@ -222,6 +222,14 @@ impl Catalog { .update_table(proto); } + pub fn update_index(&mut self, proto: &PbIndex) { + self.get_database_mut(proto.database_id) + .unwrap() + .get_schema_mut(proto.schema_id) + .unwrap() + .update_index(proto); + } + pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) { self.get_database_mut(db_id) .unwrap() diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 40cdc7125f592..faa8b281f5a64 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -85,6 +85,34 @@ impl SchemaCatalog { self.table_by_id.insert(id, table_ref); } + pub fn update_index(&mut self, prost: &PbIndex) { + let name = prost.name.clone(); + let id = prost.id.into(); + let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap(); + let primary_table = self + .get_table_by_id(&prost.primary_table_id.into()) + .unwrap(); + let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table); + let index_ref = Arc::new(index); + + self.index_by_name.insert(name, index_ref.clone()); + self.index_by_id.insert(id, index_ref.clone()); + + match self.indexes_by_table_id.entry(index_ref.primary_table.id) { + Occupied(mut entry) => { + let pos = entry + .get() + .iter() + .position(|x| x.id == index_ref.id) + .unwrap(); + *entry.get_mut().get_mut(pos).unwrap() = index_ref; + } + Vacant(_entry) => { + unreachable!() + } + }; + } + pub fn drop_table(&mut self, id: TableId) { let table_ref = self.table_by_id.remove(&id).unwrap(); self.table_by_name.remove(&table_ref.name).unwrap(); diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 41995f161261d..13fd1eb2c3875 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -220,6 +220,7 @@ impl FrontendObserverNode { Operation::Delete => { catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into()) } + Operation::Update => catalog_guard.update_index(index), _ => panic!("receive an unsupported notify {:?}", resp), }, Info::View(view) => match resp.operation() { diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index b35126e1507d3..0e82b6e9734cd 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -88,6 +88,8 @@ macro_rules! commit_meta { }; } pub(crate) use commit_meta; +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_pb::expr::expr_node::RexNode; use risingwave_pb::meta::CreatingJobInfo; pub type CatalogManagerRef = Arc>; @@ -1529,10 +1531,12 @@ where pub async fn finish_replace_table_procedure( &self, table: &Table, + table_col_index_mapping: ColIndexMapping, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); let key = (table.database_id, table.schema_id, table.name.clone()); assert!( tables.contains_key(&table.id) @@ -1540,16 +1544,57 @@ where "table must exist and be in altering procedure" ); + let index_ids: Vec<_> = indexes + .tree_ref() + .iter() + .filter(|(_, index)| index.primary_table_id == table.id) + .map(|(index_id, _index)| *index_id) + .collect_vec(); + + let mut updated_indexes = vec![]; + + for index_id in &index_ids { + let mut index = indexes.get_mut(*index_id).unwrap(); + index + .index_item + .iter_mut() + .for_each(|x| match x.rex_node.as_mut().unwrap() { + RexNode::InputRef(input_col_idx) => { + *input_col_idx = + table_col_index_mapping.map(*input_col_idx as usize) as u32; + assert_eq!( + x.return_type, + table.columns[*input_col_idx as usize] + .column_desc + .clone() + .unwrap() + .column_type + ); + } + RexNode::FuncCall(_) => unimplemented!(), + _ => unreachable!(), + }); + + updated_indexes.push(indexes.get(index_id).cloned().unwrap()); + } + // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must database_core.in_progress_creation_tracker.remove(&key); tables.insert(table.id, table.clone()); - commit_meta!(self, tables)?; + commit_meta!(self, tables, indexes)?; - let version = self + // TODO: support group notification. + let mut version = self .notify_frontend(Operation::Update, Info::Table(table.to_owned())) .await; + for index in updated_indexes { + version = self + .notify_frontend(Operation::Update, Info::Index(index)) + .await; + } + Ok(version) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index a52f526ca3ce3..f4c45cbfeae84 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -550,7 +550,12 @@ where let result = try { let (ctx, table_fragments) = self - .build_replace_table(env, &stream_job, fragment_graph, table_col_index_mapping) + .build_replace_table( + env, + &stream_job, + fragment_graph, + table_col_index_mapping.clone(), + ) .await?; self.stream_manager @@ -559,7 +564,10 @@ where }; match result { - Ok(_) => self.finish_replace_table(&stream_job).await, + Ok(_) => { + self.finish_replace_table(&stream_job, table_col_index_mapping) + .await + } Err(err) => { self.cancel_replace_table(&stream_job).await?; Err(err) @@ -687,13 +695,14 @@ where async fn finish_replace_table( &self, stream_job: &StreamingJob, + table_col_index_mapping: ColIndexMapping, ) -> MetaResult { let StreamingJob::Table(None, table) = stream_job else { unreachable!("unexpected job: {stream_job:?}") }; self.catalog_manager - .finish_replace_table_procedure(table) + .finish_replace_table_procedure(table, table_col_index_mapping) .await }