Skip to content

Commit

Permalink
disable with_create_default_catalog_and_schema if the default catal…
Browse files Browse the repository at this point in the history
…og exists (#11991)
  • Loading branch information
goldmedal authored Aug 15, 2024
1 parent ea2e7ab commit cb3ec77
Showing 1 changed file with 90 additions and 8 deletions.
98 changes: 90 additions & 8 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,8 +987,24 @@ impl SessionStateBuilder {

/// Returns a new [SessionStateBuilder] based on an existing [SessionState]
/// The session id for the new builder will be unset; all other fields will
/// be cloned from what is set in the provided session state
/// be cloned from what is set in the provided session state. If the default
/// catalog exists in existing session state, the new session state will not
/// create default catalog and schema.
pub fn new_from_existing(existing: SessionState) -> Self {
let default_catalog_exist = existing
.catalog_list()
.catalog(&existing.config.options().catalog.default_catalog)
.is_some();
// The new `with_create_default_catalog_and_schema` should be false if the default catalog exists
let create_default_catalog_and_schema = existing
.config
.options()
.catalog
.create_default_catalog_and_schema
&& !default_catalog_exist;
let new_config = existing
.config
.with_create_default_catalog_and_schema(create_default_catalog_and_schema);
Self {
session_id: None,
analyzer: Some(existing.analyzer),
Expand All @@ -1005,7 +1021,7 @@ impl SessionStateBuilder {
window_functions: Some(existing.window_functions.into_values().collect_vec()),
serializer_registry: Some(existing.serializer_registry),
file_formats: Some(existing.file_formats.into_values().collect_vec()),
config: Some(existing.config),
config: Some(new_config),
table_options: Some(existing.table_options),
execution_props: Some(existing.execution_props),
table_factories: Some(existing.table_factories),
Expand Down Expand Up @@ -1801,17 +1817,19 @@ impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::{SessionContextProvider, SessionStateBuilder};
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::MemTable;
use crate::execution::context::SessionState;
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::Expr;
use datafusion_sql::planner::{PlannerContext, SqlToRel};

use crate::execution::context::SessionState;

use super::{SessionContextProvider, SessionStateBuilder};
use std::collections::HashMap;
use std::sync::Arc;

#[test]
fn test_session_state_with_default_features() {
Expand Down Expand Up @@ -1841,4 +1859,68 @@ mod tests {

assert!(sql_to_expr(&state).is_err())
}

#[test]
fn test_from_existing() -> Result<()> {
fn employee_batch() -> RecordBatch {
let name: ArrayRef =
Arc::new(StringArray::from_iter_values(["Andy", "Andrew"]));
let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22]));
RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
}
let batch = employee_batch();
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;

let session_state = SessionStateBuilder::new()
.with_catalog_list(Arc::new(MemoryCatalogProviderList::new()))
.build();
let table_ref = session_state.resolve_table_ref("employee").to_string();
session_state
.schema_for_ref(&table_ref)?
.register_table("employee".to_string(), Arc::new(table))?;

let default_catalog = session_state
.config
.options()
.catalog
.default_catalog
.clone();
let default_schema = session_state
.config
.options()
.catalog
.default_schema
.clone();
let is_exist = session_state
.catalog_list()
.catalog(default_catalog.as_str())
.unwrap()
.schema(default_schema.as_str())
.unwrap()
.table_exist("employee");
assert!(is_exist);
let new_state = SessionStateBuilder::new_from_existing(session_state).build();
assert!(new_state
.catalog_list()
.catalog(default_catalog.as_str())
.unwrap()
.schema(default_schema.as_str())
.unwrap()
.table_exist("employee"));

// if `with_create_default_catalog_and_schema` is disabled, the new one shouldn't create default catalog and schema
let disable_create_default =
SessionConfig::default().with_create_default_catalog_and_schema(false);
let without_default_state = SessionStateBuilder::new()
.with_config(disable_create_default)
.build();
assert!(without_default_state
.catalog_list()
.catalog(&default_catalog)
.is_none());
let new_state =
SessionStateBuilder::new_from_existing(without_default_state).build();
assert!(new_state.catalog_list().catalog(&default_catalog).is_none());
Ok(())
}
}

0 comments on commit cb3ec77

Please sign in to comment.