Skip to content

Commit

Permalink
PoC improving repartition
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Dec 9, 2024
1 parent 47569b2 commit a9d4a0e
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::compute::kernels::zip::zip;
use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -779,9 +780,11 @@ impl RepartitionExec {
metrics: RepartitionMetrics,
context: Arc<TaskContext>,
) -> Result<()> {
let is_round_robin = matches!(partitioning, Partitioning::RoundRobinBatch(_));
let num_partitions = partitioning.partition_count();

let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

// execute the child operator
let timer = metrics.fetch_time.timer();
let mut stream = input.execute(partition, context)?;
Expand All @@ -805,18 +808,61 @@ impl RepartitionExec {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();

let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
if is_round_robin {
// try out the default (round robin) partition, then other partitions
for p in std::iter::once(partition).chain(0..num_partitions) {
let timer = metrics.send_time[partition].timer();

if let Some((tx, reservation)) = output_channels.get_mut(&p) {
reservation.lock().try_grow(size)?;

let sent = tx.send(Some(Ok(batch.clone()))).now_or_never();

match sent {
Some(res) if res.is_err() => {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
Some(_) => {
timer.done();
continue;
}
None => {
timer.done();
}
}
}
}
// not yet succeeded to send to any partition, now send to the partition and wait for it
let timer: metrics::ScopedTimerGuard<'_> = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
}
timer.done();

}
timer.done();
else {
let timer: metrics::ScopedTimerGuard<'_> = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) = output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;

if tx.send(Some(Ok(batch))).await.is_err() {
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
reservation.lock().shrink(size);
output_channels.remove(&partition);
}
}
timer.done();
}
}

// If the input stream is endless, we may spin forever and
Expand Down

0 comments on commit a9d4a0e

Please sign in to comment.