Skip to content

Commit

Permalink
Move spill related functions to spill.rs (#11509)
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi authored Jul 17, 2024
1 parent 81a6094 commit adcfd85
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 94 deletions.
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use crate::common::IPCWriter;
use crate::metrics::{BaselineMetrics, RecordOutput};
use crate::sorts::sort::sort_batch;
use crate::sorts::streaming_merge;
use crate::spill::read_spill_as_stream;
use crate::stream::RecordBatchStreamAdapter;
use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr};
use crate::{aggregates, ExecutionPlan, PhysicalExpr};
use crate::{RecordBatchStream, SendableRecordBatchStream};

use arrow::array::*;
Expand Down
116 changes: 27 additions & 89 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,41 @@
use std::any::Any;
use std::fmt::Debug;
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
use crate::metrics::MetricsSet;
use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;

use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use futures::stream::{StreamExt, TryStreamExt};
use tokio::task::JoinSet;

use datafusion_common::config::ConfigOptions;
use datafusion_common::{exec_datafusion_err, exec_err, Result};
pub use datafusion_common::hash_utils;
pub use datafusion_common::utils::project_schema;
use datafusion_common::{exec_err, Result};
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
use datafusion_execution::TaskContext;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
pub use datafusion_expr::{Accumulator, ColumnarValue};
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr,
};
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
};

use futures::stream::{StreamExt, TryStreamExt};
use log::debug;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
use crate::metrics::MetricsSet;
pub use crate::ordering::InputOrderMode;
use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use crate::stream::EmptyRecordBatchStream;
use crate::stream::RecordBatchStreamAdapter;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

mod ordering;
mod topk;
Expand All @@ -70,6 +80,7 @@ pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod sorts;
pub mod spill;
pub mod stream;
pub mod streaming;
pub mod tree_node;
Expand All @@ -79,29 +90,6 @@ pub mod values;
pub mod windows;
pub mod work_table;

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

pub use datafusion_common::hash_utils;
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
pub use datafusion_expr::{Accumulator, ColumnarValue};
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr,
};

// Backwards compatibility
use crate::common::IPCWriter;
pub use crate::stream::EmptyRecordBatchStream;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};

pub mod udaf {
pub use datafusion_physical_expr_common::aggregate::{
create_aggregate_expr, AggregateFunctionExpr,
Expand Down Expand Up @@ -903,63 +891,13 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
actual.iter().map(|elem| elem.to_string()).collect()
}

/// Read spilled batches from the disk
///
/// `path` - temp file
/// `schema` - batches schema, should be the same across batches
/// `buffer` - internal buffer of capacity batches
pub fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
let sender = builder.tx();

builder.spawn_blocking(move || read_spill(sender, path.path()));

Ok(builder.build())
}

/// Spills in-memory `batches` to disk.
///
/// Returns total number of the rows spilled to disk.
pub fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}

#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};

use datafusion_common::{Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};

Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use crate::sorts::streaming_merge::streaming_merge;
use crate::spill::{read_spill_as_stream, spill_record_batches};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType,
Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan,
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionMode,
ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
};

use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
Expand Down
87 changes: 87 additions & 0 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the spilling functions
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};

use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;

use datafusion_common::{exec_datafusion_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;

use crate::common::IPCWriter;
use crate::stream::RecordBatchReceiverStream;

/// Read spilled batches from the disk
///
/// `path` - temp file
/// `schema` - batches schema, should be the same across batches
/// `buffer` - internal buffer of capacity batches
pub fn read_spill_as_stream(
path: RefCountedTempFile,
schema: SchemaRef,
buffer: usize,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
let sender = builder.tx();

builder.spawn_blocking(move || read_spill(sender, path.path()));

Ok(builder.build())
}

/// Spills in-memory `batches` to disk.
///
/// Returns total number of the rows spilled to disk.
pub fn spill_record_batches(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
debug!(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}

fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(path)?);
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch.map_err(Into::into))
.map_err(|e| exec_datafusion_err!("{e}"))?;
}
Ok(())
}

0 comments on commit adcfd85

Please sign in to comment.