Skip to content

Commit

Permalink
Implement proto serialization for (Bounded)WindowAggExec. (#7557)
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav <[email protected]>
  • Loading branch information
vrongmeal authored Sep 22, 2023
1 parent 70b8620 commit 1c847b4
Show file tree
Hide file tree
Showing 11 changed files with 870 additions and 195 deletions.
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/aggregate/regr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub struct Regr {
expr_x: Arc<dyn PhysicalExpr>,
}

impl Regr {
pub fn get_regr_type(&self) -> RegrType {
self.regr_type.clone()
}
}

#[derive(Debug, Clone)]
#[allow(clippy::upper_case_acronyms)]
pub enum RegrType {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub use crate::aggregate::grouping::Grouping;
pub use crate::aggregate::median::Median;
pub use crate::aggregate::min_max::{Max, Min};
pub use crate::aggregate::min_max::{MaxAccumulator, MinAccumulator};
pub use crate::aggregate::regr::Regr;
pub use crate::aggregate::regr::{Regr, RegrType};
pub use crate::aggregate::stats::StatsType;
pub use crate::aggregate::stddev::{Stddev, StddevPop};
pub use crate::aggregate::sum::Sum;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ impl WindowShift {
pub fn get_shift_offset(&self) -> i64 {
self.shift_offset
}

/// Get the default_value for window shift expression.
pub fn get_default_value(&self) -> Option<ScalarValue> {
self.default_value.clone()
}
}

/// lead() window function
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use aggregate::PlainAggregateWindowExpr;
pub use built_in::BuiltInWindowExpr;
pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
pub use sliding_aggregate::SlidingAggregateWindowExpr;
pub use window_expr::NthValueKind;
pub use window_expr::PartitionBatches;
pub use window_expr::PartitionKey;
pub use window_expr::PartitionWindowAggStates;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/ntile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl Ntile {
pub fn new(name: String, n: u64) -> Self {
Self { name, n }
}

pub fn get_n(&self) -> u64 {
self.n
}
}

impl BuiltInWindowFunctionExpr for Ntile {
Expand Down
20 changes: 17 additions & 3 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,11 @@ message PhysicalWindowExprNode {
BuiltInWindowFunction built_in_function = 2;
// udaf = 3
}
PhysicalExprNode expr = 4;
repeated PhysicalExprNode args = 4;
repeated PhysicalExprNode partition_by = 5;
repeated PhysicalSortExprNode order_by = 6;
WindowFrame window_frame = 7;
string name = 8;
}

message PhysicalIsNull {
Expand Down Expand Up @@ -1388,11 +1392,21 @@ enum AggregateMode {
SINGLE_PARTITIONED = 4;
}

message PartiallySortedPartitionSearchMode {
repeated uint64 columns = 6;
}

message WindowAggExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode window_expr = 2;
repeated string window_expr_name = 3;
repeated PhysicalWindowExprNode window_expr = 2;
Schema input_schema = 4;
repeated PhysicalExprNode partition_keys = 5;
// Set optional to `None` for `BoundedWindowAggExec`.
oneof partition_search_mode {
EmptyMessage linear = 7;
PartiallySortedPartitionSearchMode partially_sorted = 8;
EmptyMessage sorted = 9;
}
}

message MaybeFilter {
Expand Down
Loading

0 comments on commit 1c847b4

Please sign in to comment.