Skip to content

Commit

Permalink
refactor(7181): move streaming_merge() into separate mod from the mer…
Browse files Browse the repository at this point in the history
…ge mod. (#7799)

Merge mod has the SortPreservingMergeStream, containing the loser tree. This SortPreservingMergeStream struct will be used repeatedly as part of the cascading merge; in turn, the cascading merge will be implemented for the streaming_merge() method.
  • Loading branch information
wiedld authored Oct 13, 2023
1 parent 485b80e commit 20bc365
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 71 deletions.
73 changes: 5 additions & 68 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,84 +21,21 @@
use crate::metrics::BaselineMetrics;
use crate::sorts::builder::BatchBuilder;
use crate::sorts::cursor::Cursor;
use crate::sorts::stream::{FieldCursorStream, PartitionedStream, RowCursorStream};
use crate::{PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream};
use arrow::datatypes::{DataType, SchemaRef};
use crate::sorts::stream::PartitionedStream;
use crate::RecordBatchStream;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::*;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

macro_rules! primitive_merge_helper {
($t:ty, $($v:ident),+) => {
merge_helper!(PrimitiveArray<$t>, $($v),+)
};
}

macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
)));
}};
}

/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions
/// while preserving order.
pub fn streaming_merge(
streams: Vec<SendableRecordBatchStream>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}

let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;

Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
)))
}

/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>;

#[derive(Debug)]
struct SortPreservingMergeStream<C> {
pub(crate) struct SortPreservingMergeStream<C> {
in_progress: BatchBuilder,

/// The sorted input streams to merge together
Expand Down Expand Up @@ -162,7 +99,7 @@ struct SortPreservingMergeStream<C> {
}

impl<C: Cursor> SortPreservingMergeStream<C> {
fn new(
pub(crate) fn new(
streams: CursorStream<C>,
schema: SchemaRef,
metrics: BaselineMetrics,
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
mod builder;
mod cursor;
mod index;
pub mod merge;
mod merge;
pub mod sort;
pub mod sort_preserving_merge;
mod stream;
pub mod streaming_merge;

pub use index::RowIndex;
pub(crate) use merge::streaming_merge;
pub(crate) use streaming_merge::streaming_merge;
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::expressions::PhysicalSortExpr;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use crate::sorts::merge::streaming_merge;
use crate::sorts::streaming_merge::streaming_merge;
use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::topk::TopK;
use crate::{
Expand Down
92 changes: 92 additions & 0 deletions datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Merge that deals with an arbitrary size of streaming inputs.
//! This is an order-preserving merge.
use crate::metrics::BaselineMetrics;
use crate::sorts::{
merge::SortPreservingMergeStream,
stream::{FieldCursorStream, RowCursorStream},
};
use crate::{PhysicalSortExpr, SendableRecordBatchStream};
use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

macro_rules! primitive_merge_helper {
($t:ty, $($v:ident),+) => {
merge_helper!(PrimitiveArray<$t>, $($v),+)
};
}

macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
)));
}};
}

/// Perform a streaming merge of [`SendableRecordBatchStream`] based on provided sort expressions
/// while preserving order.
pub fn streaming_merge(
streams: Vec<SendableRecordBatchStream>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}

let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;

Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
)))
}

0 comments on commit 20bc365

Please sign in to comment.