Skip to content

Commit

Permalink
Support COPY TO Externally Defined File Formats, add FileType trait (a…
Browse files Browse the repository at this point in the history
…pache#11060)

* wip create and register ext file types with session

* Add contains function, and support in datafusion substrait consumer (apache#10879)

* adding new function contains

* adding substrait test

* adding doc

* adding doc

* Update docs/source/user-guide/sql/scalar_functions.md

Co-authored-by: Alex Huang <[email protected]>

* adding entry

---------

Co-authored-by: Alex Huang <[email protected]>

* logical planning updated

* compiling

* removing filetype enum

* compiling

* working on tests

* fix some tests

* test fixes

* cli fix

* cli fmt

* Update datafusion/core/src/datasource/file_format/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/core/src/execution/session_state.rs

Co-authored-by: Andrew Lamb <[email protected]>

* review comments

* review comments

* review comments

* typo fix

* fmt

* fix err log style

* fmt

---------

Co-authored-by: Lordworms <[email protected]>
Co-authored-by: Alex Huang <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
4 people authored and comphead committed Jul 2, 2024
1 parent b8957bc commit c0d0926
Show file tree
Hide file tree
Showing 40 changed files with 1,305 additions and 644 deletions.
31 changes: 19 additions & 12 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::str::FromStr;

use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
Expand All @@ -35,14 +34,14 @@ use crate::{

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::common::FileType;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
Expand Down Expand Up @@ -291,6 +290,15 @@ impl AdjustedPrintOptions {
}
}

fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
match ext.to_lowercase().as_str() {
"csv" => Some(ConfigFileType::CSV),
"json" => Some(ConfigFileType::JSON),
"parquet" => Some(ConfigFileType::PARQUET),
_ => None,
}
}

async fn create_plan(
ctx: &mut dyn CliSessionContext,
statement: Statement,
Expand All @@ -302,7 +310,7 @@ async fn create_plan(
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// To support custom formats, treat error as None
let format = FileType::from_str(&cmd.file_type).ok();
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
ctx,
&cmd.location,
Expand All @@ -313,13 +321,13 @@ async fn create_plan(
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
let format: FileType = (&copy_to.format_options).into();
let format = config_file_type_from_str(&copy_to.file_type.get_ext());

register_object_store_and_config_extensions(
ctx,
&copy_to.output_url,
&copy_to.options,
Some(format),
format,
)
.await?;
}
Expand Down Expand Up @@ -357,7 +365,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
ctx: &dyn CliSessionContext,
location: &String,
options: &HashMap<String, String>,
format: Option<FileType>,
format: Option<ConfigFileType>,
) -> Result<()> {
// Parse the location URL to extract the scheme and other components
let table_path = ListingTableUrl::parse(location)?;
Expand All @@ -374,7 +382,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
// Clone and modify the default table options based on the provided options
let mut table_options = ctx.session_state().default_table_options().clone();
if let Some(format) = format {
table_options.set_file_format(format);
table_options.set_config_format(format);
}
table_options.alter_with_string_hash_map(options)?;

Expand All @@ -392,7 +400,6 @@ pub(crate) async fn register_object_store_and_config_extensions(
mod tests {
use super::*;

use datafusion::common::config::FormatOptions;
use datafusion::common::plan_err;

use datafusion::prelude::SessionContext;
Expand All @@ -403,7 +410,7 @@ mod tests {
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
let format = FileType::from_str(&cmd.file_type).ok();
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
&cmd.location,
Expand All @@ -429,12 +436,12 @@ mod tests {
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Copy(cmd) = &plan {
let format: FileType = (&cmd.format_options).into();
let format = config_file_type_from_str(&cmd.file_type.get_ext());
register_object_store_and_config_extensions(
&ctx,
&cmd.output_url,
&cmd.options,
Some(format),
format,
)
.await?;
} else {
Expand Down Expand Up @@ -484,7 +491,7 @@ mod tests {
let mut plan = create_plan(&mut ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert!(matches!(copy_to.format_options, FormatOptions::PARQUET(_)));
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
ctx.runtime_env()
.object_store_registry
.get_store(&Url::parse(&copy_to.output_url).unwrap())?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::sync::Arc;

use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{FileType, GetExt};

use object_store::aws::AmazonS3Builder;
use url::Url;
Expand Down Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());
.with_file_extension(ParquetFormat::default().get_ext());
ctx.register_listing_table("test", &path, listing_options, None, None)
.await?;

Expand All @@ -79,7 +79,7 @@ async fn main() -> Result<()> {

let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());
.with_file_extension(ParquetFormat::default().get_ext());
ctx.register_listing_table("test2", &out_path, listing_options, None, None)
.await?;

Expand Down
77 changes: 36 additions & 41 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::str::FromStr;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::{DataFusionError, FileType, Result};
use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
Expand Down Expand Up @@ -1116,6 +1116,16 @@ macro_rules! extensions_options {
}
}

/// These file types have special built in behavior for configuration.
/// Use TableOptions::Extensions for configuring other file types.
#[derive(Debug, Clone)]
pub enum ConfigFileType {
CSV,
#[cfg(feature = "parquet")]
PARQUET,
JSON,
}

/// Represents the configuration options available for handling different table formats within a data processing application.
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
Expand All @@ -1134,7 +1144,7 @@ pub struct TableOptions {

/// The current file format that the table operations should assume. This option allows
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
pub current_format: Option<FileType>,
pub current_format: Option<ConfigFileType>,

/// Optional extensions that can be used to extend or customize the behavior of the table
/// options. Extensions can be registered using `Extensions::insert` and might include
Expand All @@ -1152,10 +1162,9 @@ impl ConfigField for TableOptions {
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.visit(v, "format", ""),
FileType::CSV => self.csv.visit(v, "format", ""),
FileType::JSON => self.json.visit(v, "format", ""),
_ => {}
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
ConfigFileType::JSON => self.json.visit(v, "format", ""),
}
} else {
self.csv.visit(v, "csv", "");
Expand Down Expand Up @@ -1188,12 +1197,9 @@ impl ConfigField for TableOptions {
match key {
"format" => match format {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.set(rem, value),
FileType::CSV => self.csv.set(rem, value),
FileType::JSON => self.json.set(rem, value),
_ => {
_config_err!("Config value \"{key}\" is not supported on {}", format)
}
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
},
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
Expand All @@ -1210,15 +1216,6 @@ impl TableOptions {
Self::default()
}

/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_file_format(&mut self, format: FileType) {
self.current_format = Some(format);
}

/// Creates a new `TableOptions` instance initialized with settings from a given session config.
///
/// # Parameters
Expand Down Expand Up @@ -1249,6 +1246,15 @@ impl TableOptions {
clone
}

/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_config_format(&mut self, format: ConfigFileType) {
self.current_format = Some(format);
}

/// Sets the extensions for this `TableOptions` instance.
///
/// # Parameters
Expand Down Expand Up @@ -1673,6 +1679,8 @@ config_namespace! {
}
}

pub trait FormatOptionsExt: Display {}

#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
Expand All @@ -1698,28 +1706,15 @@ impl Display for FormatOptions {
}
}

impl From<FileType> for FormatOptions {
fn from(value: FileType) -> Self {
match value {
FileType::ARROW => FormatOptions::ARROW,
FileType::AVRO => FormatOptions::AVRO,
#[cfg(feature = "parquet")]
FileType::PARQUET => FormatOptions::PARQUET(TableParquetOptions::default()),
FileType::CSV => FormatOptions::CSV(CsvOptions::default()),
FileType::JSON => FormatOptions::JSON(JsonOptions::default()),
}
}
}

#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;

use crate::config::{
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
TableOptions,
};
use crate::FileType;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -1777,7 +1772,7 @@ mod tests {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set_file_format(FileType::CSV);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
Expand All @@ -1794,7 +1789,7 @@ mod tests {
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::CSV);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
Expand All @@ -1807,7 +1802,7 @@ mod tests {
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
Expand All @@ -1821,7 +1816,7 @@ mod tests {
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
Expand All @@ -1835,7 +1830,7 @@ mod tests {
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
Expand Down
Loading

0 comments on commit c0d0926

Please sign in to comment.