From 620d58e81f85858e01b0a2f81656425269061dbe Mon Sep 17 00:00:00 2001 From: Callum Ryan <19956159+callum-ryan@users.noreply.github.com> Date: Thu, 5 Sep 2024 03:44:55 +0100 Subject: [PATCH] feat: SQL Catalog - namespaces (#534) * feat: SQL Catalog - namespaces Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * feat: use transaction for updates and creates Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: pull out query param builder to fn Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * feat: add drop and tests Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: String to str, remove pub and optimise query builder Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: nested match, remove ok() Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: remove pub, add set, add comments Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: refactor list_namespaces slightly Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: add default properties to all new namespaces Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: remove check for nested namespace Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * chore: add more comments to the CatalogConfig to explain bind styles Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> * fix: edit test for nested namespaces Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --------- Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 732 ++++++++++++++++++++++++++++-- crates/catalog/sql/src/error.rs | 9 +- 2 files changed, 703 insertions(+), 38 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 078fff690..c6a524cea 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -15,19 +15,20 @@ // specific language governing permissions and limitations // under the License. -use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; use iceberg::table::Table; -use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; -use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow}; -use sqlx::AnyPool; +use iceberg::{ + Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, +}; +use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; +use sqlx::{Any, AnyPool, Row, Transaction}; use typed_builder::TypedBuilder; -use crate::error::from_sqlx_error; +use crate::error::{from_sqlx_error, no_such_namespace_err}; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -46,7 +47,14 @@ static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if n static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning -/// Sql catalog config +/// A struct representing the SQL catalog configuration. +/// +/// This struct contains various parameters that are used to configure a SQL catalog, +/// such as the database URI, warehouse location, and file I/O settings. +/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog. +/// The options available for this parameter include: +/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases. +/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases. #[derive(Debug, TypedBuilder)] pub struct SqlCatalogConfig { uri: String, @@ -61,7 +69,7 @@ pub struct SqlCatalogConfig { #[derive(Debug)] /// Sql catalog implementation. pub struct SqlCatalog { - _name: String, + name: String, connection: AnyPool, _warehouse_location: String, _fileio: FileIO, @@ -132,7 +140,7 @@ impl SqlCatalog { .map_err(from_sqlx_error)?; Ok(SqlCatalog { - _name: config.name.to_owned(), + name: config.name.to_owned(), connection: pool, _warehouse_location: config.warehouse_location, _fileio: config.file_io, @@ -141,21 +149,30 @@ impl SqlCatalog { } /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. - pub async fn execute_statement( - &self, - query: &String, - args: Vec>, - ) -> Result> { - let query_with_placeholders: Cow = - if self.sql_bind_style == SqlBindStyle::DollarNumeric { - let mut query = query.clone(); - for i in 0..args.len() { - query = query.replacen("?", &format!("${}", i + 1), 1); - } - Cow::Owned(query) - } else { - Cow::Borrowed(query) - }; + fn replace_placeholders(&self, query: &str) -> String { + match self.sql_bind_style { + SqlBindStyle::DollarNumeric => { + let mut count = 1; + query + .chars() + .fold(String::with_capacity(query.len()), |mut acc, c| { + if c == '?' { + acc.push('$'); + acc.push_str(&count.to_string()); + count += 1; + } else { + acc.push(c); + } + acc + }) + } + _ => query.to_owned(), + } + } + + /// Fetch a vec of AnyRows from a given query + async fn fetch_rows(&self, query: &str, args: Vec>) -> Result> { + let query_with_placeholders = self.replace_placeholders(query); let mut sqlx_query = sqlx::query(&query_with_placeholders); for arg in args { @@ -167,39 +184,292 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + + /// Execute statements in a transaction, provided or not + async fn execute( + &self, + query: &str, + args: Vec>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result { + let query_with_placeholders = self.replace_placeholders(query); + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result> { - todo!() + // UNION will remove duplicates. + let all_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + UNION + SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + let namespace_rows = self + .fetch_rows(&all_namespaces_stmt, vec![ + Some(&self.name), + Some(&self.name), + ]) + .await?; + + let mut namespaces = HashSet::::with_capacity(namespace_rows.len()); + + if let Some(parent) = parent { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); + + for row in namespace_rows.iter() { + let nsp = row.try_get::(0).map_err(from_sqlx_error)?; + // if parent = a, then we only want to see a.b, a.c returned. + if nsp != parent_str && nsp.starts_with(&parent_str) { + namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?); + } + } + + Ok(namespaces.into_iter().collect::>()) + } else { + no_such_namespace_err(parent) + } + } else { + for row in namespace_rows.iter() { + let nsp = row.try_get::(0).map_err(from_sqlx_error)?; + let mut levels = nsp.split(".").collect::>(); + if !levels.is_empty() { + let first_level = levels.drain(..1).collect::>(); + namespaces.insert(NamespaceIdent::from_strs(first_level)?); + } + } + + Ok(namespaces.into_iter().collect::>()) + } } async fn create_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result { - todo!() + let exists = self.namespace_exists(namespace).await?; + + if exists { + return Err(Error::new( + iceberg::ErrorKind::Unexpected, + format!("Namespace {:?} already exists", namespace), + )); + } + + let namespace_str = namespace.join("."); + let insert = format!( + "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) + VALUES (?, ?, ?, ?)"); + if !properties.is_empty() { + let mut insert_properties = properties.clone(); + insert_properties.insert("exists".to_string(), "true".to_string()); + + let mut query_args = Vec::with_capacity(insert_properties.len() * 4); + let mut insert_stmt = insert.clone(); + for (index, (key, value)) in insert_properties.iter().enumerate() { + query_args.extend_from_slice(&[ + Some(self.name.as_str()), + Some(namespace_str.as_str()), + Some(key.as_str()), + Some(value.as_str()), + ]); + if index > 0 { + insert_stmt.push_str(", (?, ?, ?, ?)"); + } + } + + self.execute(&insert_stmt, query_args, None).await?; + + Ok(Namespace::with_properties( + namespace.clone(), + insert_properties, + )) + } else { + // set a default property of exists = true + self.execute( + &insert, + vec![ + Some(&self.name), + Some(&namespace_str), + Some("exists"), + Some("true"), + ], + None, + ) + .await?; + Ok(Namespace::with_properties(namespace.clone(), properties)) + } } - async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { + let exists = self.namespace_exists(namespace).await?; + if exists { + let namespace_props = self + .fetch_rows( + &format!( + "SELECT + {NAMESPACE_FIELD_NAME}, + {NAMESPACE_FIELD_PROPERTY_KEY}, + {NAMESPACE_FIELD_PROPERTY_VALUE} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ?" + ), + vec![Some(&self.name), Some(&namespace.join("."))], + ) + .await?; + + let mut properties = HashMap::with_capacity(namespace_props.len()); + + for row in namespace_props { + let key = row + .try_get::(NAMESPACE_FIELD_PROPERTY_KEY) + .map_err(from_sqlx_error)?; + let value = row + .try_get::(NAMESPACE_FIELD_PROPERTY_VALUE) + .map_err(from_sqlx_error)?; + + properties.insert(key, value); + } + + Ok(Namespace::with_properties(namespace.clone(), properties)) + } else { + no_such_namespace_err(namespace) + } } - async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { + let namespace_str = namespace.join("."); + + let table_namespaces = self + .fetch_rows( + &format!( + "SELECT 1 FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + LIMIT 1" + ), + vec![Some(&self.name), Some(&namespace_str)], + ) + .await?; + + if !table_namespaces.is_empty() { + Ok(true) + } else { + let namespaces = self + .fetch_rows( + &format!( + "SELECT 1 FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ? + LIMIT 1" + ), + vec![Some(&self.name), Some(&namespace_str)], + ) + .await?; + if !namespaces.is_empty() { + Ok(true) + } else { + Ok(false) + } + } } async fn update_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result<()> { - todo!() + let exists = self.namespace_exists(namespace).await?; + if exists { + let existing_properties = self.get_namespace(namespace).await?.properties().clone(); + let namespace_str = namespace.join("."); + + let mut updates = vec![]; + let mut inserts = vec![]; + + for (key, value) in properties.iter() { + if existing_properties.contains_key(key) { + if existing_properties.get(key) != Some(value) { + updates.push((key, value)); + } + } else { + inserts.push((key, value)); + } + } + + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let update_stmt = format!( + "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ? + AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?" + ); + + let insert_stmt = format!( + "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) + VALUES (?, ?, ?, ?)" + ); + + for (key, value) in updates { + self.execute( + &update_stmt, + vec![ + Some(value), + Some(&self.name), + Some(&namespace_str), + Some(key), + ], + Some(&mut tx), + ) + .await?; + } + + for (key, value) in inserts { + self.execute( + &insert_stmt, + vec![ + Some(&self.name), + Some(&namespace_str), + Some(key), + Some(value), + ], + Some(&mut tx), + ) + .await?; + } + + let _ = tx.commit().await.map_err(from_sqlx_error)?; + + Ok(()) + } else { + no_such_namespace_err(namespace) + } } async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { @@ -241,8 +511,11 @@ impl Catalog for SqlCatalog { #[cfg(test)] mod tests { + use std::collections::{HashMap, HashSet}; + use std::hash::Hash; + use iceberg::io::FileIOBuilder; - use iceberg::Catalog; + use iceberg::{Catalog, Namespace, NamespaceIdent}; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; @@ -253,6 +526,14 @@ mod tests { temp_dir.path().to_str().unwrap().to_string() } + fn to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } + + fn default_properties() -> HashMap { + HashMap::from([("exists".to_string(), "true".to_string())]) + } + async fn new_sql_catalog(warehouse_location: String) -> impl Catalog { let sql_lite_uri = format!("sqlite:{}", temp_path()); sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); @@ -268,6 +549,19 @@ mod tests { SqlCatalog::new(config).await.unwrap() } + async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { + let _ = catalog + .create_namespace(namespace_ident, HashMap::new()) + .await + .unwrap(); + } + + async fn create_namespaces(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) { + for namespace_ident in namespace_idents { + let _ = create_namespace(catalog, namespace_ident).await; + } + } + #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); @@ -276,4 +570,368 @@ mod tests { new_sql_catalog(warehouse_loc.clone()).await; new_sql_catalog(warehouse_loc.clone()).await; } + + #[tokio::test] + async fn test_list_namespaces_returns_empty_vector() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_2]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_only_top_level_namespaces() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + ]) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_3]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_no_namespaces_under_parent() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_namespace_under_parent() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("c".into()); + create_namespaces(&catalog, &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + ]) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1.clone(), namespace_ident_3]) + ); + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".to_string()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(); + let namespace_ident_5 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + &namespace_ident_4, + &namespace_ident_5, + ]) + .await; + + assert_eq!( + to_set( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap() + ), + to_set(vec![ + NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(), + NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(), + NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(), + ]) + ); + } + + #[tokio::test] + async fn test_namespace_exists_returns_false() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert!(!catalog + .namespace_exists(&NamespaceIdent::new("b".into())) + .await + .unwrap()); + } + + #[tokio::test] + async fn test_namespace_exists_returns_true() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert!(catalog.namespace_exists(&namespace_ident).await.unwrap()); + } + + #[tokio::test] + async fn test_create_namespace_with_properties() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("abc".into()); + + let mut properties = default_properties(); + properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .create_namespace(&namespace_ident, properties.clone()) + .await + .unwrap(), + Namespace::with_properties(namespace_ident.clone(), properties.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, properties) + ); + } + + #[tokio::test] + async fn test_create_namespace_throws_error_if_namespace_already_exists() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Namespace {:?} already exists", + &namespace_ident + ) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, default_properties()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let parent_namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &parent_namespace_ident).await; + + let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&child_namespace_ident, HashMap::new()) + .await + .unwrap(), + Namespace::new(child_namespace_ident.clone()) + ); + + assert_eq!( + catalog.get_namespace(&child_namespace_ident).await.unwrap(), + Namespace::with_properties(child_namespace_ident, default_properties()) + ); + } + + #[tokio::test] + async fn test_create_deeply_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&namespace_ident_a_b_c, HashMap::new()) + .await + .unwrap(), + Namespace::new(namespace_ident_a_b_c.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, default_properties()) + ); + } + + #[tokio::test] + #[ignore = "drop_namespace not implemented"] + async fn test_drop_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + catalog.drop_namespace(&namespace_ident).await.unwrap(); + + assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap()) + } + + #[tokio::test] + #[ignore = "drop_namespace not implemented"] + async fn test_drop_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + catalog.drop_namespace(&namespace_ident_a_b).await.unwrap(); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + } + + #[tokio::test] + #[ignore = "drop_namespace not implemented"] + async fn test_drop_deeply_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces(&catalog, &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ]) + .await; + + catalog + .drop_namespace(&namespace_ident_a_b_c) + .await + .unwrap(); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b_c) + .await + .unwrap()); + + assert!(catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + } + + #[tokio::test] + #[ignore = "drop_namespace not implemented"] + async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + #[ignore = "drop_namespace not implemented"] + async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; + + let non_existent_namespace_ident = + NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap(); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + #[ignore = "drop_namespace not implemented"] + async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + catalog.drop_namespace(&namespace_ident_a).await.unwrap(); + + assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + + assert!(catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + } } diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index 90bba1f05..cfefcc26a 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use iceberg::{Error, ErrorKind}; +use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; /// Format an sqlx error into iceberg error. pub fn from_sqlx_error(error: sqlx::Error) -> Error { @@ -25,3 +25,10 @@ pub fn from_sqlx_error(error: sqlx::Error) -> Error { ) .with_source(error) } + +pub fn no_such_namespace_err(namespace: &NamespaceIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such namespace: {:?}", namespace), + )) +}