diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 1a7390d46f89..4d0a7738b039 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1038,8 +1038,8 @@ mod tests { use crate::datasource::file_format::avro::AvroFormat; use crate::datasource::file_format::csv::CsvFormat; use crate::datasource::file_format::json::JsonFormat; - use crate::datasource::file_format::parquet::ParquetFormat; #[cfg(feature = "parquet")] + use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::{provider_as_source, MemTable}; use crate::execution::options::ArrowReadOptions; use crate::physical_plan::collect; diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 715e2da5d978..f485c49e9109 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -246,6 +246,7 @@ mod tests { use crate::datasource::schema_adapter::{ SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; + #[cfg(feature = "parquet")] use parquet::arrow::ArrowWriter; use tempfile::TempDir; diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs index ac02c7317256..a1b3eab25f33 100644 --- a/datafusion/core/src/execution/mod.rs +++ b/datafusion/core/src/execution/mod.rs @@ -19,6 +19,9 @@ pub mod context; pub mod session_state; +mod session_state_defaults; + +pub use session_state_defaults::SessionStateDefaults; // backwards compatibility pub use crate::datasource::file_format::options; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 0824b249b7d1..59cc620dae4d 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -18,29 +18,17 @@ //! [`SessionState`]: information required to run queries in a session use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}; -use crate::catalog::listing_schema::ListingSchemaProvider; -use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider}; -use crate::catalog::{ - CatalogProvider, CatalogProviderList, MemoryCatalogProvider, - MemoryCatalogProviderList, -}; +use crate::catalog::schema::SchemaProvider; +use crate::catalog::{CatalogProviderList, MemoryCatalogProviderList}; use crate::datasource::cte_worktable::CteWorkTable; -use crate::datasource::file_format::arrow::ArrowFormatFactory; -use crate::datasource::file_format::avro::AvroFormatFactory; -use crate::datasource::file_format::csv::CsvFormatFactory; -use crate::datasource::file_format::json::JsonFormatFactory; -#[cfg(feature = "parquet")] -use crate::datasource::file_format::parquet::ParquetFormatFactory; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; use crate::datasource::function::{TableFunction, TableFunctionImpl}; -use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory}; +use crate::datasource::provider::TableProviderFactory; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; -#[cfg(feature = "array_expressions")] -use crate::functions_array; +use crate::execution::SessionStateDefaults; use crate::physical_optimizer::optimizer::PhysicalOptimizer; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; -use crate::{functions, functions_aggregate}; use arrow_schema::{DataType, SchemaRef}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -54,7 +42,6 @@ use datafusion_common::{ ResolvedTableReference, TableReference, }; use datafusion_execution::config::SessionConfig; -use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; @@ -85,7 +72,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; -use url::Url; use uuid::Uuid; /// Execution context for registering data sources and executing queries. @@ -1420,169 +1406,6 @@ impl From for SessionStateBuilder { } } -/// Defaults that are used as part of creating a SessionState such as table providers, -/// file formats, registering of builtin functions, etc. -pub struct SessionStateDefaults {} - -impl SessionStateDefaults { - /// returns a map of the default [`TableProviderFactory`]s - pub fn default_table_factories() -> HashMap> { - let mut table_factories: HashMap> = - HashMap::new(); - #[cfg(feature = "parquet")] - table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new())); - table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new())); - - table_factories - } - - /// returns the default MemoryCatalogProvider - pub fn default_catalog( - config: &SessionConfig, - table_factories: &HashMap>, - runtime: &Arc, - ) -> MemoryCatalogProvider { - let default_catalog = MemoryCatalogProvider::new(); - - default_catalog - .register_schema( - &config.options().catalog.default_schema, - Arc::new(MemorySchemaProvider::new()), - ) - .expect("memory catalog provider can register schema"); - - Self::register_default_schema(config, table_factories, runtime, &default_catalog); - - default_catalog - } - - /// returns the list of default [`ExprPlanner`]s - pub fn default_expr_planners() -> Vec> { - let expr_planners: Vec> = vec![ - Arc::new(functions::core::planner::CoreFunctionPlanner::default()), - // register crate of array expressions (if enabled) - #[cfg(feature = "array_expressions")] - Arc::new(functions_array::planner::ArrayFunctionPlanner), - #[cfg(feature = "array_expressions")] - Arc::new(functions_array::planner::FieldAccessPlanner), - #[cfg(any( - feature = "datetime_expressions", - feature = "unicode_expressions" - ))] - Arc::new(functions::planner::UserDefinedFunctionPlanner), - ]; - - expr_planners - } - - /// returns the list of default [`ScalarUDF']'s - pub fn default_scalar_functions() -> Vec> { - let mut functions: Vec> = functions::all_default_functions(); - #[cfg(feature = "array_expressions")] - functions.append(&mut functions_array::all_default_array_functions()); - - functions - } - - /// returns the list of default [`AggregateUDF']'s - pub fn default_aggregate_functions() -> Vec> { - functions_aggregate::all_default_aggregate_functions() - } - - /// returns the list of default [`FileFormatFactory']'s - pub fn default_file_formats() -> Vec> { - let file_formats: Vec> = vec![ - #[cfg(feature = "parquet")] - Arc::new(ParquetFormatFactory::new()), - Arc::new(JsonFormatFactory::new()), - Arc::new(CsvFormatFactory::new()), - Arc::new(ArrowFormatFactory::new()), - Arc::new(AvroFormatFactory::new()), - ]; - - file_formats - } - - /// registers all builtin functions - scalar, array and aggregate - pub fn register_builtin_functions(state: &mut SessionState) { - Self::register_scalar_functions(state); - Self::register_array_functions(state); - Self::register_aggregate_functions(state); - } - - /// registers all the builtin scalar functions - pub fn register_scalar_functions(state: &mut SessionState) { - functions::register_all(state).expect("can not register built in functions"); - } - - /// registers all the builtin array functions - pub fn register_array_functions(state: &mut SessionState) { - // register crate of array expressions (if enabled) - #[cfg(feature = "array_expressions")] - functions_array::register_all(state).expect("can not register array expressions"); - } - - /// registers all the builtin aggregate functions - pub fn register_aggregate_functions(state: &mut SessionState) { - functions_aggregate::register_all(state) - .expect("can not register aggregate functions"); - } - - /// registers the default schema - pub fn register_default_schema( - config: &SessionConfig, - table_factories: &HashMap>, - runtime: &Arc, - default_catalog: &MemoryCatalogProvider, - ) { - let url = config.options().catalog.location.as_ref(); - let format = config.options().catalog.format.as_ref(); - let (url, format) = match (url, format) { - (Some(url), Some(format)) => (url, format), - _ => return, - }; - let url = url.to_string(); - let format = format.to_string(); - - let url = Url::parse(url.as_str()).expect("Invalid default catalog location!"); - let authority = match url.host_str() { - Some(host) => format!("{}://{}", url.scheme(), host), - None => format!("{}://", url.scheme()), - }; - let path = &url.as_str()[authority.len()..]; - let path = object_store::path::Path::parse(path).expect("Can't parse path"); - let store = ObjectStoreUrl::parse(authority.as_str()) - .expect("Invalid default catalog url"); - let store = match runtime.object_store(store) { - Ok(store) => store, - _ => return, - }; - let factory = match table_factories.get(format.as_str()) { - Some(factory) => factory, - _ => return, - }; - let schema = - ListingSchemaProvider::new(authority, path, factory.clone(), store, format); - let _ = default_catalog - .register_schema("default", Arc::new(schema)) - .expect("Failed to register default schema"); - } - - /// registers the default [`FileFormatFactory`]s - pub fn register_default_file_formats(state: &mut SessionState) { - let formats = SessionStateDefaults::default_file_formats(); - for format in formats { - if let Err(e) = state.register_file_format(format, false) { - log::info!("Unable to register default file format: {e}") - }; - } - } -} - /// Adapter that implements the [`ContextProvider`] trait for a [`SessionState`] /// /// This is used so the SQL planner can access the state of the session without diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs new file mode 100644 index 000000000000..0b0465e44605 --- /dev/null +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::catalog::listing_schema::ListingSchemaProvider; +use crate::catalog::{CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider}; +use crate::datasource::file_format::arrow::ArrowFormatFactory; +use crate::datasource::file_format::avro::AvroFormatFactory; +use crate::datasource::file_format::csv::CsvFormatFactory; +use crate::datasource::file_format::json::JsonFormatFactory; +#[cfg(feature = "parquet")] +use crate::datasource::file_format::parquet::ParquetFormatFactory; +use crate::datasource::file_format::FileFormatFactory; +use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory}; +use crate::execution::context::SessionState; +#[cfg(feature = "array_expressions")] +use crate::functions_array; +use crate::{functions, functions_aggregate}; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_expr::planner::ExprPlanner; +use datafusion_expr::{AggregateUDF, ScalarUDF}; +use std::collections::HashMap; +use std::sync::Arc; +use url::Url; + +/// Defaults that are used as part of creating a SessionState such as table providers, +/// file formats, registering of builtin functions, etc. +pub struct SessionStateDefaults {} + +impl SessionStateDefaults { + /// returns a map of the default [`TableProviderFactory`]s + pub fn default_table_factories() -> HashMap> { + let mut table_factories: HashMap> = + HashMap::new(); + #[cfg(feature = "parquet")] + table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new())); + + table_factories + } + + /// returns the default MemoryCatalogProvider + pub fn default_catalog( + config: &SessionConfig, + table_factories: &HashMap>, + runtime: &Arc, + ) -> MemoryCatalogProvider { + let default_catalog = MemoryCatalogProvider::new(); + + default_catalog + .register_schema( + &config.options().catalog.default_schema, + Arc::new(MemorySchemaProvider::new()), + ) + .expect("memory catalog provider can register schema"); + + Self::register_default_schema(config, table_factories, runtime, &default_catalog); + + default_catalog + } + + /// returns the list of default [`ExprPlanner`]s + pub fn default_expr_planners() -> Vec> { + let expr_planners: Vec> = vec![ + Arc::new(functions::core::planner::CoreFunctionPlanner::default()), + // register crate of array expressions (if enabled) + #[cfg(feature = "array_expressions")] + Arc::new(functions_array::planner::ArrayFunctionPlanner), + #[cfg(feature = "array_expressions")] + Arc::new(functions_array::planner::FieldAccessPlanner), + #[cfg(any( + feature = "datetime_expressions", + feature = "unicode_expressions" + ))] + Arc::new(functions::planner::UserDefinedFunctionPlanner), + ]; + + expr_planners + } + + /// returns the list of default [`ScalarUDF']'s + pub fn default_scalar_functions() -> Vec> { + let mut functions: Vec> = functions::all_default_functions(); + #[cfg(feature = "array_expressions")] + functions.append(&mut functions_array::all_default_array_functions()); + + functions + } + + /// returns the list of default [`AggregateUDF']'s + pub fn default_aggregate_functions() -> Vec> { + functions_aggregate::all_default_aggregate_functions() + } + + /// returns the list of default [`FileFormatFactory']'s + pub fn default_file_formats() -> Vec> { + let file_formats: Vec> = vec![ + #[cfg(feature = "parquet")] + Arc::new(ParquetFormatFactory::new()), + Arc::new(JsonFormatFactory::new()), + Arc::new(CsvFormatFactory::new()), + Arc::new(ArrowFormatFactory::new()), + Arc::new(AvroFormatFactory::new()), + ]; + + file_formats + } + + /// registers all builtin functions - scalar, array and aggregate + pub fn register_builtin_functions(state: &mut SessionState) { + Self::register_scalar_functions(state); + Self::register_array_functions(state); + Self::register_aggregate_functions(state); + } + + /// registers all the builtin scalar functions + pub fn register_scalar_functions(state: &mut SessionState) { + functions::register_all(state).expect("can not register built in functions"); + } + + /// registers all the builtin array functions + pub fn register_array_functions(state: &mut SessionState) { + // register crate of array expressions (if enabled) + #[cfg(feature = "array_expressions")] + functions_array::register_all(state).expect("can not register array expressions"); + } + + /// registers all the builtin aggregate functions + pub fn register_aggregate_functions(state: &mut SessionState) { + functions_aggregate::register_all(state) + .expect("can not register aggregate functions"); + } + + /// registers the default schema + pub fn register_default_schema( + config: &SessionConfig, + table_factories: &HashMap>, + runtime: &Arc, + default_catalog: &MemoryCatalogProvider, + ) { + let url = config.options().catalog.location.as_ref(); + let format = config.options().catalog.format.as_ref(); + let (url, format) = match (url, format) { + (Some(url), Some(format)) => (url, format), + _ => return, + }; + let url = url.to_string(); + let format = format.to_string(); + + let url = Url::parse(url.as_str()).expect("Invalid default catalog location!"); + let authority = match url.host_str() { + Some(host) => format!("{}://{}", url.scheme(), host), + None => format!("{}://", url.scheme()), + }; + let path = &url.as_str()[authority.len()..]; + let path = object_store::path::Path::parse(path).expect("Can't parse path"); + let store = ObjectStoreUrl::parse(authority.as_str()) + .expect("Invalid default catalog url"); + let store = match runtime.object_store(store) { + Ok(store) => store, + _ => return, + }; + let factory = match table_factories.get(format.as_str()) { + Some(factory) => factory, + _ => return, + }; + let schema = + ListingSchemaProvider::new(authority, path, factory.clone(), store, format); + let _ = default_catalog + .register_schema("default", Arc::new(schema)) + .expect("Failed to register default schema"); + } + + /// registers the default [`FileFormatFactory`]s + pub fn register_default_file_formats(state: &mut SessionState) { + let formats = SessionStateDefaults::default_file_formats(); + for format in formats { + if let Err(e) = state.register_file_format(format, false) { + log::info!("Unable to register default file format: {e}") + }; + } + } +}