From 20bc365e2908ee7731bda7a82b0e29da2665b8bf Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 13 Oct 2023 02:24:49 -0400 Subject: [PATCH] refactor(7181): move streaming_merge() into separate mod from the merge mod. (#7799) Merge mod has the SortPreservingMergeStream, containing the loser tree. This SortPreservingMergeStream struct will be used repeatedly as part of the cascading merge; in turn, the cascading merge will be implemented for the streaming_merge() method. --- datafusion/physical-plan/src/sorts/merge.rs | 73 +-------------- datafusion/physical-plan/src/sorts/mod.rs | 5 +- datafusion/physical-plan/src/sorts/sort.rs | 2 +- .../src/sorts/streaming_merge.rs | 92 +++++++++++++++++++ 4 files changed, 101 insertions(+), 71 deletions(-) create mode 100644 datafusion/physical-plan/src/sorts/streaming_merge.rs diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 67685509abe5..e60baf2cd806 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -21,84 +21,21 @@ use crate::metrics::BaselineMetrics; use crate::sorts::builder::BatchBuilder; use crate::sorts::cursor::Cursor; -use crate::sorts::stream::{FieldCursorStream, PartitionedStream, RowCursorStream}; -use crate::{PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream}; -use arrow::datatypes::{DataType, SchemaRef}; +use crate::sorts::stream::PartitionedStream; +use crate::RecordBatchStream; +use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use arrow_array::*; use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; use std::pin::Pin; use std::task::{ready, Context, Poll}; -macro_rules! primitive_merge_helper { - ($t:ty, $($v:ident),+) => { - merge_helper!(PrimitiveArray<$t>, $($v),+) - }; -} - -macro_rules! merge_helper { - ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ - let streams = FieldCursorStream::<$t>::new($sort, $streams); - return Ok(Box::pin(SortPreservingMergeStream::new( - Box::new(streams), - $schema, - $tracking_metrics, - $batch_size, - $fetch, - $reservation, - ))); - }}; -} - -/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions -/// while preserving order. -pub fn streaming_merge( - streams: Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - metrics: BaselineMetrics, - batch_size: usize, - fetch: Option, - reservation: MemoryReservation, -) -> Result { - // Special case single column comparisons with optimized cursor implementations - if expressions.len() == 1 { - let sort = expressions[0].clone(); - let data_type = sort.expr.data_type(schema.as_ref())?; - downcast_primitive! { - data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), - DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) - _ => {} - } - } - - let streams = RowCursorStream::try_new( - schema.as_ref(), - expressions, - streams, - reservation.new_empty(), - )?; - - Ok(Box::pin(SortPreservingMergeStream::new( - Box::new(streams), - schema, - metrics, - batch_size, - fetch, - reservation, - ))) -} - /// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`] type CursorStream = Box>>; #[derive(Debug)] -struct SortPreservingMergeStream { +pub(crate) struct SortPreservingMergeStream { in_progress: BatchBuilder, /// The sorted input streams to merge together @@ -162,7 +99,7 @@ struct SortPreservingMergeStream { } impl SortPreservingMergeStream { - fn new( + pub(crate) fn new( streams: CursorStream, schema: SchemaRef, metrics: BaselineMetrics, diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index dff39db423f0..8a1184d3c2b5 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -20,10 +20,11 @@ mod builder; mod cursor; mod index; -pub mod merge; +mod merge; pub mod sort; pub mod sort_preserving_merge; mod stream; +pub mod streaming_merge; pub use index::RowIndex; -pub(crate) use merge::streaming_merge; +pub(crate) use streaming_merge::streaming_merge; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 703f80d90d2b..a56f8fec6876 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -24,7 +24,7 @@ use crate::expressions::PhysicalSortExpr; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; -use crate::sorts::merge::streaming_merge; +use crate::sorts::streaming_merge::streaming_merge; use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use crate::topk::TopK; use crate::{ diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs new file mode 100644 index 000000000000..96d180027eee --- /dev/null +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Merge that deals with an arbitrary size of streaming inputs. +//! This is an order-preserving merge. + +use crate::metrics::BaselineMetrics; +use crate::sorts::{ + merge::SortPreservingMergeStream, + stream::{FieldCursorStream, RowCursorStream}, +}; +use crate::{PhysicalSortExpr, SendableRecordBatchStream}; +use arrow::datatypes::{DataType, SchemaRef}; +use arrow_array::*; +use datafusion_common::Result; +use datafusion_execution::memory_pool::MemoryReservation; + +macro_rules! primitive_merge_helper { + ($t:ty, $($v:ident),+) => { + merge_helper!(PrimitiveArray<$t>, $($v),+) + }; +} + +macro_rules! merge_helper { + ($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{ + let streams = FieldCursorStream::<$t>::new($sort, $streams); + return Ok(Box::pin(SortPreservingMergeStream::new( + Box::new(streams), + $schema, + $tracking_metrics, + $batch_size, + $fetch, + $reservation, + ))); + }}; +} + +/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions +/// while preserving order. +pub fn streaming_merge( + streams: Vec, + schema: SchemaRef, + expressions: &[PhysicalSortExpr], + metrics: BaselineMetrics, + batch_size: usize, + fetch: Option, + reservation: MemoryReservation, +) -> Result { + // Special case single column comparisons with optimized cursor implementations + if expressions.len() == 1 { + let sort = expressions[0].clone(); + let data_type = sort.expr.data_type(schema.as_ref())?; + downcast_primitive! { + data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation), + DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation) + _ => {} + } + } + + let streams = RowCursorStream::try_new( + schema.as_ref(), + expressions, + streams, + reservation.new_empty(), + )?; + + Ok(Box::pin(SortPreservingMergeStream::new( + Box::new(streams), + schema, + metrics, + batch_size, + fetch, + reservation, + ))) +}