From dad7e7c21375736a3be48338b2a2d00b00463847 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 17 Jul 2024 21:13:27 +0200 Subject: [PATCH] Move spill related functions to spill.rs (#11509) --- .../physical-plan/src/aggregates/row_hash.rs | 3 +- datafusion/physical-plan/src/lib.rs | 116 ++++-------------- datafusion/physical-plan/src/sorts/sort.rs | 8 +- datafusion/physical-plan/src/spill.rs | 87 +++++++++++++ 4 files changed, 120 insertions(+), 94 deletions(-) create mode 100644 datafusion/physical-plan/src/spill.rs diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index a1d3378181c28..167ca72407503 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -31,8 +31,9 @@ use crate::common::IPCWriter; use crate::metrics::{BaselineMetrics, RecordOutput}; use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge; +use crate::spill::read_spill_as_stream; use crate::stream::RecordBatchStreamAdapter; -use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr}; +use crate::{aggregates, ExecutionPlan, PhysicalExpr}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index dc736993a4533..046977da0a37e 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -21,31 +21,41 @@ use std::any::Any; use std::fmt::Debug; -use std::fs::File; -use std::io::BufReader; -use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::coalesce_partitions::CoalescePartitionsExec; -use crate::display::DisplayableExecutionPlan; -use crate::metrics::MetricsSet; -use crate::repartition::RepartitionExec; -use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; - use arrow::datatypes::SchemaRef; -use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; +use futures::stream::{StreamExt, TryStreamExt}; +use tokio::task::JoinSet; + use datafusion_common::config::ConfigOptions; -use datafusion_common::{exec_datafusion_err, exec_err, Result}; +pub use datafusion_common::hash_utils; +pub use datafusion_common::utils::project_schema; +use datafusion_common::{exec_err, Result}; +pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; use datafusion_execution::TaskContext; +pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +pub use datafusion_expr::{Accumulator, ColumnarValue}; +pub use datafusion_physical_expr::window::WindowExpr; +pub use datafusion_physical_expr::{ + expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, +}; use datafusion_physical_expr::{ EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, }; -use futures::stream::{StreamExt, TryStreamExt}; -use log::debug; -use tokio::sync::mpsc::Sender; -use tokio::task::JoinSet; +use crate::coalesce_partitions::CoalescePartitionsExec; +use crate::display::DisplayableExecutionPlan; +pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +pub use crate::metrics::Metric; +use crate::metrics::MetricsSet; +pub use crate::ordering::InputOrderMode; +use crate::repartition::RepartitionExec; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; +pub use crate::stream::EmptyRecordBatchStream; +use crate::stream::RecordBatchStreamAdapter; +pub use crate::topk::TopK; +pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; mod ordering; mod topk; @@ -70,6 +80,7 @@ pub mod projection; pub mod recursive_query; pub mod repartition; pub mod sorts; +pub mod spill; pub mod stream; pub mod streaming; pub mod tree_node; @@ -79,29 +90,6 @@ pub mod values; pub mod windows; pub mod work_table; -pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; -pub use crate::metrics::Metric; -pub use crate::ordering::InputOrderMode; -pub use crate::topk::TopK; -pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; - -pub use datafusion_common::hash_utils; -pub use datafusion_common::utils::project_schema; -pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; -pub use datafusion_expr::{Accumulator, ColumnarValue}; -pub use datafusion_physical_expr::window::WindowExpr; -pub use datafusion_physical_expr::{ - expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, -}; - -// Backwards compatibility -use crate::common::IPCWriter; -pub use crate::stream::EmptyRecordBatchStream; -use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; -use datafusion_execution::disk_manager::RefCountedTempFile; -use datafusion_execution::memory_pool::human_readable_size; -pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; - pub mod udaf { pub use datafusion_physical_expr_common::aggregate::{ create_aggregate_expr, AggregateFunctionExpr, @@ -903,63 +891,13 @@ pub fn get_plan_string(plan: &Arc) -> Vec { actual.iter().map(|elem| elem.to_string()).collect() } -/// Read spilled batches from the disk -/// -/// `path` - temp file -/// `schema` - batches schema, should be the same across batches -/// `buffer` - internal buffer of capacity batches -pub fn read_spill_as_stream( - path: RefCountedTempFile, - schema: SchemaRef, - buffer: usize, -) -> Result { - let mut builder = RecordBatchReceiverStream::builder(schema, buffer); - let sender = builder.tx(); - - builder.spawn_blocking(move || read_spill(sender, path.path())); - - Ok(builder.build()) -} - -/// Spills in-memory `batches` to disk. -/// -/// Returns total number of the rows spilled to disk. -pub fn spill_record_batches( - batches: Vec, - path: PathBuf, - schema: SchemaRef, -) -> Result { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(&batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok(writer.num_rows) -} - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = FileReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} - #[cfg(test)] mod tests { use std::any::Any; use std::sync::Arc; use arrow_schema::{Schema, SchemaRef}; + use datafusion_common::{Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f347a0f5b6d56..5b99f8bc71617 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -30,13 +30,13 @@ use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use crate::sorts::streaming_merge::streaming_merge; +use crate::spill::{read_spill_as_stream, spill_record_batches}; use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ - read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType, - Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionMode, + ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }; use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn}; diff --git a/datafusion/physical-plan/src/spill.rs b/datafusion/physical-plan/src/spill.rs new file mode 100644 index 0000000000000..0018a27bd22bb --- /dev/null +++ b/datafusion/physical-plan/src/spill.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the spilling functions + +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; + +use arrow::datatypes::SchemaRef; +use arrow::ipc::reader::FileReader; +use arrow::record_batch::RecordBatch; +use log::debug; +use tokio::sync::mpsc::Sender; + +use datafusion_common::{exec_datafusion_err, Result}; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::memory_pool::human_readable_size; +use datafusion_execution::SendableRecordBatchStream; + +use crate::common::IPCWriter; +use crate::stream::RecordBatchReceiverStream; + +/// Read spilled batches from the disk +/// +/// `path` - temp file +/// `schema` - batches schema, should be the same across batches +/// `buffer` - internal buffer of capacity batches +pub fn read_spill_as_stream( + path: RefCountedTempFile, + schema: SchemaRef, + buffer: usize, +) -> Result { + let mut builder = RecordBatchReceiverStream::builder(schema, buffer); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, path.path())); + + Ok(builder.build()) +} + +/// Spills in-memory `batches` to disk. +/// +/// Returns total number of the rows spilled to disk. +pub fn spill_record_batches( + batches: Vec, + path: PathBuf, + schema: SchemaRef, +) -> Result { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + for batch in batches { + writer.write(&batch)?; + } + writer.finish()?; + debug!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, + writer.num_rows, + human_readable_size(writer.num_bytes), + ); + Ok(writer.num_rows) +} + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(path)?); + let reader = FileReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch.map_err(Into::into)) + .map_err(|e| exec_datafusion_err!("{e}"))?; + } + Ok(()) +}