Skip to content

Commit

Permalink
modify scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Dec 18, 2024
1 parent 637d0ac commit aa4877c
Show file tree
Hide file tree
Showing 4 changed files with 463 additions and 39 deletions.
3 changes: 3 additions & 0 deletions benchmarks/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
# under the License.

rich
pandas
pyarrow
chardet
17 changes: 0 additions & 17 deletions benchmarks/src/bin/tpcds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,13 @@ async fn compare_duckdb_datafusion(
sql: &str,
parquet_dir: &str,
) -> Result<(), DataFusionError> {
// Step 1: Execute query in DuckDB (used as the expected result)
let expected_batches = execute_duckdb_query(sql, parquet_dir)?;

// Step 2: Execute query in DataFusion (actual result)
let ctx = create_tpcds_context(parquet_dir).await?;
let actual_batches = execute_datafusion_query(sql, ctx).await?;

// Step 3: Format the batches for comparison
let expected_output = pretty_format_batches(&expected_batches)?.to_string();
let actual_output = pretty_format_batches(&actual_batches)?.to_string();

if expected_output != actual_output {
// Print detailed error information if outputs do not match
eprintln!("❌ Query failed: Results do not match!");
eprintln!("SQL:\n{}", sql);
eprintln!("Expected:\n{}", expected_output);
Expand All @@ -153,19 +147,16 @@ async fn compare_duckdb_datafusion(

/// Execute a query in DuckDB and return the results as RecordBatch
fn execute_duckdb_query(sql: &str, parquet_dir: &str) -> Result<Vec<RecordBatch>> {
// Initialize DuckDB connection
let conn = Connection::open_in_memory().map_err(|e| {
DataFusionError::Execution(format!("DuckDB connection error: {}", e))
})?;

// Register all TPC-DS tables in DuckDB
for table in TPCDS_TABLES {
let path = format!("{}/{}.parquet", parquet_dir, table);
let sql = format!(
"CREATE TABLE {} AS SELECT * FROM read_parquet('{}')",
table, path
);
println!("sql is {:?}", sql);
conn.execute(&sql, []).map_err(|e| {
DataFusionError::Execution(format!(
"Error registering table '{}': {}",
Expand All @@ -174,7 +165,6 @@ fn execute_duckdb_query(sql: &str, parquet_dir: &str) -> Result<Vec<RecordBatch>
})?;
}

// Execute the query
let mut stmt = conn.prepare(sql).map_err(|e| {
DataFusionError::Execution(format!("SQL preparation error: {}", e))
})?;
Expand All @@ -191,14 +181,10 @@ async fn execute_datafusion_query(
sql: &str,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>> {
// Execute the query
let df = ctx.sql(sql).await?;

// Collect the results into RecordBatch
df.collect().await
}

/// Load SQL query from a file
fn load_query(query_number: usize) -> Result<String> {
let query_path = format!("datafusion/core/tests/tpc-ds/{}.sql", query_number);
fs::read_to_string(&query_path).map_err(|e| {
Expand All @@ -211,10 +197,7 @@ fn load_query(query_number: usize) -> Result<String> {

#[tokio::main]
async fn main() -> Result<()> {
// Initialize logger
env_logger::init();

// Parse command-line arguments
let opt = TpcdsOpt::from_args();
match opt {
TpcdsOpt::Run(opt) => opt.run().await,
Expand Down
Loading

0 comments on commit aa4877c

Please sign in to comment.