Skip to content

Commit

Permalink
Merge branch 'main' into cli-snap
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/common/src/config.rs
#	datafusion/core/tests/config_from_env.rs
#	datafusion/sqllogictest/test_files/information_schema.slt
#	docs/source/user-guide/configs.md
  • Loading branch information
blaginin committed Dec 20, 2024
2 parents 855315f + 75202b5 commit fd3240d
Show file tree
Hide file tree
Showing 154 changed files with 4,939 additions and 1,370 deletions.
36 changes: 20 additions & 16 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,22 +337,26 @@ jobs:
env:
POSTGRES_PORT: ${{ job.services.postgres.ports[5432] }}

windows:
name: cargo test (win64)
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Setup Rust toolchain
uses: ./.github/actions/setup-windows-builder
- name: Run tests (excluding doctests)
shell: bash
run: |
export PATH=$PATH:$HOME/d/protoc/bin
cargo test --lib --tests --bins --features avro,json,backtrace
cd datafusion-cli
cargo test --lib --tests --bins --all-features
# Temporarily commenting out the Windows flow, the reason is enormously slow running build
# Waiting for new Windows 2025 github runner
# Details: https://github.com/apache/datafusion/issues/13726
#
# windows:
# name: cargo test (win64)
# runs-on: windows-latest
# steps:
# - uses: actions/checkout@v4
# with:
# submodules: true
# - name: Setup Rust toolchain
# uses: ./.github/actions/setup-windows-builder
# - name: Run tests (excluding doctests)
# shell: bash
# run: |
# export PATH=$PATH:$HOME/d/protoc/bin
# cargo test --lib --tests --bins --features avro,json,backtrace
# cd datafusion-cli
# cargo test --lib --tests --bins --all-features

macos:
name: cargo test (macos)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ recursive = "0.1.1"
regex = "1.8"
rstest = "0.23.0"
serde_json = "1"
sqlparser = { version = "0.52.0", features = ["visitor"] }
sqlparser = { version = "0.53.0", features = ["visitor"] }
tempfile = "3"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
url = "2.2"
Expand Down
11 changes: 7 additions & 4 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ use crate::{
};

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
Expand Down Expand Up @@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
the source finishes, but the source is unbounded"
);
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ cargo run --example dataframe
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -214,7 +215,8 @@ impl CustomExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
Loading

0 comments on commit fd3240d

Please sign in to comment.