Skip to content

Commit

Permalink
Support Null regex override in csv parser options. (#13228)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
dhegberg and alamb authored Dec 18, 2024
1 parent 1fc7769 commit 01ffb64
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 62 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,10 @@ config_namespace! {
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
// The output format for Nulls in the CSV writer.
pub null_value: Option<String>, default = None
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ object_store = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true, optional = true, default-features = true }
rand = { workspace = true }
regex = { workspace = true }
sqlparser = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/benches/csv_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

group.bench_function("null regex override", |b| {
b.iter(|| {
load_csv(
ctx.clone(),
test_file.path().to_str().unwrap(),
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
81 changes: 78 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use regex::Regex;

#[derive(Default)]
/// Factory struct used to create [CsvFormatFactory]
Expand Down Expand Up @@ -218,6 +219,13 @@ impl CsvFormat {
self
}

/// Set the regex to use for null values in the CSV reader.
/// - default to treat empty values as null.
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
self.options.null_regex = null_regex;
self
}

/// Returns `Some(true)` if the first line is a header, `Some(false)` if
/// it is not, and `None` if it is not specified.
pub fn has_header(&self) -> Option<bool> {
Expand Down Expand Up @@ -502,6 +510,12 @@ impl CsvFormat {
.with_delimiter(self.options.delimiter)
.with_quote(self.options.quote);

if let Some(null_regex) = &self.options.null_regex {
let regex = Regex::new(null_regex.as_str())
.expect("Unable to parse CSV null regex.");
format = format.with_null_regex(regex);
}

if let Some(escape) = self.options.escape {
format = format.with_escape(escape);
}
Expand Down Expand Up @@ -813,8 +827,67 @@ mod tests {
let state = session_ctx.state();

let projection = None;
let exec =
get_exec(&state, "aggregate_test_100.csv", projection, None, true).await?;
let root = "./tests/data/csv";
let format = CsvFormat::default().with_has_header(true);
let exec = scan_format(
&state,
&format,
root,
"aggregate_test_100_with_nulls.csv",
projection,
None,
)
.await?;

let x: Vec<String> = exec
.schema()
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(
vec![
"c1: Utf8",
"c2: Int64",
"c3: Int64",
"c4: Int64",
"c5: Int64",
"c6: Int64",
"c7: Int64",
"c8: Int64",
"c9: Int64",
"c10: Utf8",
"c11: Float64",
"c12: Float64",
"c13: Utf8",
"c14: Null",
"c15: Utf8"
],
x
);

Ok(())
}

#[tokio::test]
async fn infer_schema_with_null_regex() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

let projection = None;
let root = "./tests/data/csv";
let format = CsvFormat::default()
.with_has_header(true)
.with_null_regex(Some("^NULL$|^$".to_string()));
let exec = scan_format(
&state,
&format,
root,
"aggregate_test_100_with_nulls.csv",
projection,
None,
)
.await?;

let x: Vec<String> = exec
.schema()
Expand All @@ -836,7 +909,9 @@ mod tests {
"c10: Utf8",
"c11: Float64",
"c12: Float64",
"c13: Utf8"
"c13: Utf8",
"c14: Null",
"c15: Null"
],
x
);
Expand Down
12 changes: 11 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct CsvReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional regex to match null values
pub null_regex: Option<String>,
}

impl Default for CsvReadOptions<'_> {
Expand All @@ -112,6 +114,7 @@ impl<'a> CsvReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
comment: None,
null_regex: None,
}
}

Expand Down Expand Up @@ -212,6 +215,12 @@ impl<'a> CsvReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}

/// Configure the null parsing regex.
pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
self.null_regex = null_regex;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -534,7 +543,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone());

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
Loading

0 comments on commit 01ffb64

Please sign in to comment.