Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksenn committed Dec 18, 2024
2 parents 251fbc2 + 1fc7769 commit 5f9198c
Show file tree
Hide file tree
Showing 35 changed files with 1,388 additions and 262 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ Note: If a Rust hotfix is released for the current MSRV, the MSRV will be update

DataFusion enforces MSRV policy using a [MSRV CI Check](https://github.com/search?q=repo%3Aapache%2Fdatafusion+rust-version+language%3ATOML+path%3A%2F%5ECargo.toml%2F&type=code)

## DataFusion API evolution policy
## DataFusion API Evolution and Deprecation Guidelines

Public methods in Apache DataFusion evolve over time: while we try to maintain a
stable API, we also improve the API over time. As a result, we typically
deprecate methods before removing them, according to the [api health policy].
deprecate methods before removing them, according to the [deprecation guidelines].

[api health policy]: https://datafusion.apache.org/library-user-guide/api-health.html
[deprecation guidelines]: https://datafusion.apache.org/library-user-guide/api-health.html
7 changes: 4 additions & 3 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" }
dirs = "5.0.1"
env_logger = "0.11"
futures = "0.3"
# pin as home 0.5.11 has MSRV 1.81. Can remove this once we bump MSRV to 1.81
home = "=0.5.9"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.11.0", features = ["aws", "gcp", "http"] }
parking_lot = { version = "0.12" }
Expand Down
203 changes: 200 additions & 3 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2444,7 +2444,7 @@ impl ScalarValue {
e,
size
),
ScalarValue::Union(value, fields, _mode) => match value {
ScalarValue::Union(value, fields, mode) => match value {
Some((v_id, value)) => {
let mut new_fields = Vec::with_capacity(fields.len());
let mut child_arrays = Vec::<ArrayRef>::with_capacity(fields.len());
Expand All @@ -2453,15 +2453,23 @@ impl ScalarValue {
value.to_array_of_size(size)?
} else {
let dt = field.data_type();
new_null_array(dt, size)
match mode {
UnionMode::Sparse => new_null_array(dt, size),
// In a dense union, only the child with values needs to be
// allocated
UnionMode::Dense => new_null_array(dt, 0),
}
};
let field = (**field).clone();
child_arrays.push(ar);
new_fields.push(field.clone());
}
let type_ids = repeat(*v_id).take(size);
let type_ids = ScalarBuffer::<i8>::from_iter(type_ids);
let value_offsets: Option<ScalarBuffer<i32>> = None;
let value_offsets = match mode {
UnionMode::Sparse => None,
UnionMode::Dense => Some(ScalarBuffer::from_iter(0..size as i32)),
};
let ar = UnionArray::try_new(
fields.clone(),
type_ids,
Expand Down Expand Up @@ -3892,6 +3900,7 @@ mod tests {
use arrow::compute::{is_null, kernels};
use arrow::error::ArrowError;
use arrow::util::pretty::pretty_format_columns;
use arrow_array::types::Float64Type;
use arrow_buffer::{Buffer, NullBuffer};
use arrow_schema::Fields;
use chrono::NaiveDate;
Expand Down Expand Up @@ -5554,6 +5563,194 @@ mod tests {
assert_eq!(&array, &expected);
}

#[test]
fn round_trip() {
// Each array type should be able to round tripped through a scalar
let cases: Vec<ArrayRef> = vec![
// int
Arc::new(Int8Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Int16Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Int64Array::from(vec![Some(1), None, Some(3)])),
Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)])),
Arc::new(UInt16Array::from(vec![Some(1), None, Some(3)])),
Arc::new(UInt32Array::from(vec![Some(1), None, Some(3)])),
Arc::new(UInt64Array::from(vec![Some(1), None, Some(3)])),
// bool
Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)])),
// float
Arc::new(Float32Array::from(vec![Some(1.0), None, Some(3.0)])),
Arc::new(Float64Array::from(vec![Some(1.0), None, Some(3.0)])),
// string array
Arc::new(StringArray::from(vec![Some("foo"), None, Some("bar")])),
Arc::new(LargeStringArray::from(vec![Some("foo"), None, Some("bar")])),
Arc::new(StringViewArray::from(vec![Some("foo"), None, Some("bar")])),
// string dictionary
{
let mut builder = StringDictionaryBuilder::<Int32Type>::new();
builder.append("foo").unwrap();
builder.append_null();
builder.append("bar").unwrap();
Arc::new(builder.finish())
},
// binary array
Arc::new(BinaryArray::from_iter(vec![
Some(b"foo"),
None,
Some(b"bar"),
])),
Arc::new(LargeBinaryArray::from_iter(vec![
Some(b"foo"),
None,
Some(b"bar"),
])),
Arc::new(BinaryViewArray::from_iter(vec![
Some(b"foo"),
None,
Some(b"bar"),
])),
// timestamp
Arc::new(TimestampSecondArray::from(vec![Some(1), None, Some(3)])),
Arc::new(TimestampMillisecondArray::from(vec![
Some(1),
None,
Some(3),
])),
Arc::new(TimestampMicrosecondArray::from(vec![
Some(1),
None,
Some(3),
])),
Arc::new(TimestampNanosecondArray::from(vec![Some(1), None, Some(3)])),
// timestamp with timezone
Arc::new(
TimestampSecondArray::from(vec![Some(1), None, Some(3)])
.with_timezone_opt(Some("UTC")),
),
Arc::new(
TimestampMillisecondArray::from(vec![Some(1), None, Some(3)])
.with_timezone_opt(Some("UTC")),
),
Arc::new(
TimestampMicrosecondArray::from(vec![Some(1), None, Some(3)])
.with_timezone_opt(Some("UTC")),
),
Arc::new(
TimestampNanosecondArray::from(vec![Some(1), None, Some(3)])
.with_timezone_opt(Some("UTC")),
),
// date
Arc::new(Date32Array::from(vec![Some(1), None, Some(3)])),
Arc::new(Date64Array::from(vec![Some(1), None, Some(3)])),
// time
Arc::new(Time32SecondArray::from(vec![Some(1), None, Some(3)])),
Arc::new(Time32MillisecondArray::from(vec![Some(1), None, Some(3)])),
Arc::new(Time64MicrosecondArray::from(vec![Some(1), None, Some(3)])),
Arc::new(Time64NanosecondArray::from(vec![Some(1), None, Some(3)])),
// null array
Arc::new(NullArray::new(3)),
// dense union
{
let mut builder = UnionBuilder::new_dense();
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Float64Type>("b", 3.4).unwrap();
Arc::new(builder.build().unwrap())
},
// sparse union
{
let mut builder = UnionBuilder::new_sparse();
builder.append::<Int32Type>("a", 1).unwrap();
builder.append::<Float64Type>("b", 3.4).unwrap();
Arc::new(builder.build().unwrap())
},
// list array
{
let values_builder = StringBuilder::new();
let mut builder = ListBuilder::new(values_builder);
// [A, B]
builder.values().append_value("A");
builder.values().append_value("B");
builder.append(true);
// [ ] (empty list)
builder.append(true);
// Null
builder.values().append_value("?"); // irrelevant
builder.append(false);
Arc::new(builder.finish())
},
// large list array
{
let values_builder = StringBuilder::new();
let mut builder = LargeListBuilder::new(values_builder);
// [A, B]
builder.values().append_value("A");
builder.values().append_value("B");
builder.append(true);
// [ ] (empty list)
builder.append(true);
// Null
builder.append(false);
Arc::new(builder.finish())
},
// fixed size list array
{
let values_builder = Int32Builder::new();
let mut builder = FixedSizeListBuilder::new(values_builder, 3);

// [[0, 1, 2], null, [3, null, 5]
builder.values().append_value(0);
builder.values().append_value(1);
builder.values().append_value(2);
builder.append(true);
builder.values().append_null();
builder.values().append_null();
builder.values().append_null();
builder.append(false);
builder.values().append_value(3);
builder.values().append_null();
builder.values().append_value(5);
builder.append(true);
Arc::new(builder.finish())
},
// map
{
let string_builder = StringBuilder::new();
let int_builder = Int32Builder::with_capacity(4);

let mut builder = MapBuilder::new(None, string_builder, int_builder);
// {"joe": 1}
builder.keys().append_value("joe");
builder.values().append_value(1);
builder.append(true).unwrap();
// {}
builder.append(true).unwrap();
// null
builder.append(false).unwrap();

Arc::new(builder.finish())
},
];

for arr in cases {
round_trip_through_scalar(arr);
}
}

/// for each row in `arr`:
/// 1. convert to a `ScalarValue`
/// 2. Convert `ScalarValue` back to an `ArrayRef`
/// 3. Compare the original array (sliced) and new array for equality
fn round_trip_through_scalar(arr: ArrayRef) {
for i in 0..arr.len() {
// convert Scalar --> Array
let scalar = ScalarValue::try_from_array(&arr, i).unwrap();
let array = scalar.to_array_of_size(1).unwrap();
assert_eq!(array.len(), 1);
assert_eq!(array.data_type(), arr.data_type());
assert_eq!(array.as_ref(), arr.slice(i, 1).as_ref());
}
}

#[test]
fn test_scalar_union_sparse() {
let field_a = Arc::new(Field::new("A", DataType::Int32, true));
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2380,6 +2380,30 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn aggregate_assert_no_empty_batches() -> Result<()> {
// build plan using DataFrame API
let df = test_table().await?;
let group_expr = vec![col("c1")];
let aggr_expr = vec![
min(col("c12")),
max(col("c12")),
avg(col("c12")),
sum(col("c12")),
count(col("c12")),
count_distinct(col("c12")),
median(col("c12")),
];

let df: Vec<RecordBatch> = df.aggregate(group_expr, aggr_expr)?.collect().await?;
// Empty batches should not be produced
for batch in df {
assert!(batch.num_rows() > 0);
}

Ok(())
}

#[tokio::test]
async fn test_aggregate_with_pk() -> Result<()> {
// create the dataframe
Expand Down
Loading

0 comments on commit 5f9198c

Please sign in to comment.