Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reported DataFusion performance problem #9148

Closed
alamb opened this issue Feb 7, 2024 · 6 comments
Closed

Reported DataFusion performance problem #9148

alamb opened this issue Feb 7, 2024 · 6 comments
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@alamb
Copy link
Contributor

alamb commented Feb 7, 2024

Describe the bug

Reported in discord by @mispp: https://discord.com/channels/885562378132000778/1166447479609376850/1204163621433639003

ok people, a performance question if i may... I pulled a ~400mb parquet file from new york taxi drives- for testing. have a simple aggregation that is supposed to sum up a column called trip_time. no group by column is done and it is all performed via dataframe
this operation lasts for ~2s
is this expected?

i saw a video https://youtu.be/NVKujPxwSBA?t=1589 that showed datafusion processed some gigabytes in less than a second

So basically the task here is to reproduce the reported performance and see if there is anything wrong or that we could improve

To Reproduce

Original report: https://gist.github.com/mispp/229fdad7d70c8ab974a8f72f4bdfc43c

DataSet: https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2023-01.parquet

Cargo.toml

[package]
name = "perf-issue"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
datafusion = "34"
arrow-schema = "*"

Program:

use std::time::SystemTime;

use datafusion::{
    common::Column,
    execution::{context::SessionContext, options::ParquetReadOptions},
    logical_expr::Expr
};


#[tokio::main]
async fn main() {
    let start = SystemTime::now();

    let _ctx = SessionContext::new();
    let _read_options = ParquetReadOptions {
        file_extension: ".parquet",
        table_partition_cols: vec!(),
        parquet_pruning: Some(true),
        skip_metadata: Some(false),
        schema: None,
        file_sort_order: vec![]
    };



    let analysis_expressions: Vec<Expr> = [ datafusion::logical_expr::expr_fn::sum(Expr::Column(Column::from_name("trip_time"))) ].to_vec();
    let group_expressions: Vec<Expr> = [].to_vec();

    println!("just before df -> {}", start.elapsed().unwrap().as_millis());

    let df = _ctx.read_parquet("./fhvhv_tripdata_2023-01.parquet", _read_options).await.unwrap();
    println!("reading df -> {}", start.elapsed().unwrap().as_millis());

    let df_aggregated = df.aggregate(group_expressions, analysis_expressions).unwrap().collect().await;
    println!("df aggregation -> {}", start.elapsed().unwrap().as_millis());

    println!("results -> {:?}", df_aggregated);

}

Expected behavior

No response

Additional context

No response

@alamb alamb added the bug Something isn't working label Feb 7, 2024
@alamb alamb changed the title DataFusion performance problem (or optimization opportunity?) Reported DataFusion performance problem Feb 7, 2024
@alamb alamb added the help wanted Extra attention is needed label Feb 7, 2024
@alamb
Copy link
Contributor Author

alamb commented Feb 7, 2024

Ran this on my M3 Mac and it finished in 144ms

andrewlamb@Andrews-MacBook-Pro:~/Downloads$ ./rust_playground
just before df -> 1
reading df -> 6
df aggregation -> 144
results -> Ok([RecordBatch { schema: Schema { fields: [Field { name: "SUM(?table?.trip_time)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  20227776240,
]], row_count: 1 }])
andrewlamb@Andrews-MacBook-Pro:~/Downloads$

When I ran the debug build, it took more like 2 seconds:

andrewlamb@Andrews-MacBook-Pro:~/Downloads$ ./rust_playground.debug
just before df -> 6
reading df -> 17
df aggregation -> 1822
results -> Ok([RecordBatch { schema: Schema { fields: [Field { name: "SUM(?table?.trip_time)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  20227776240,
]], row_count: 1 }])

@alamb
Copy link
Contributor Author

alamb commented Feb 7, 2024

So I wonder if the reporter simply didn't run with a release build

@alamb
Copy link
Contributor Author

alamb commented Feb 7, 2024

I am going to try this on a linux/less powerful machine

@mispp
Copy link

mispp commented Feb 7, 2024

So I wonder if the reporter simply didn't run with a release build

No, was a simple 'cargo run' with no parameters given. Ok, so this was the reason.

@alamb
Copy link
Contributor Author

alamb commented Feb 7, 2024

Ah, got it -- I think you need to run cargo run --release to get good performance

Thanks again for the report @mispp

Closing this one down as I think we have found the root cause

@alamb alamb closed this as completed Feb 7, 2024
@alamb
Copy link
Contributor Author

alamb commented Feb 7, 2024

This is consistent on my more limited linux machine too:

alamb@aal-dev:~/rust_playground$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.10s
     Running `target/debug/rust_playground`
just before df -> 0
reading df -> 2
df aggregation -> 2758
results -> Ok([RecordBatch { schema: Schema { fields: [Field { name: "SUM(?table?.trip_time)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  20227776240,
]], row_count: 1 }])
alamb@aal-dev:~/rust_playground$
alamb@aal-dev:~/rust_playground$ cargo build --release
    Finished release [optimized] target(s) in 0.11s
alamb@aal-dev:~/rust_playground$ cargo run --release
    Finished release [optimized] target(s) in 0.10s
     Running `target/release/rust_playground`
just before df -> 0
reading df -> 0
df aggregation -> 185
results -> Ok([RecordBatch { schema: Schema { fields: [Field { name: "SUM(?table?.trip_time)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  20227776240,
]], row_count: 1 }])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants