Skip to content

Commit

Permalink
fix(batch): release epoch at the end of the distributed query if it c…
Browse files Browse the repository at this point in the history
…ontains lookup join (#6997)

**This section will be used as the commit message. Please do not leave this empty!**

Please explain **IN DETAIL** what the changes are in this PR and why they are needed:

- As title.


Approved-By: BugenZhao
Approved-By: BowenXiao1999

Co-Authored-By: Dylan Chen <[email protected]>
  • Loading branch information
chenzl25 and chenzl25 authored Dec 21, 2022
1 parent 94d508a commit fd0c81e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl QueryRunner {
);
}
let mut stages_with_table_scan = self.query.stages_with_table_scan();
let has_lookup_join_stage = self.query.has_lookup_join_stage();
// To convince the compiler that `pinned_snapshot` will only be dropped once.
let mut pinned_snapshot_to_drop = Some(pinned_snapshot);
while let Some(msg_inner) = self.msg_receiver.recv().await {
Expand All @@ -248,7 +249,9 @@ impl QueryRunner {
);
self.scheduled_stages_count += 1;
stages_with_table_scan.remove(&stage_id);
if stages_with_table_scan.is_empty() {
// If query contains lookup join we need to delay epoch unpin util the end of
// the query.
if !has_lookup_join_stage && stages_with_table_scan.is_empty() {
// We can be sure here that all the Hummock iterators have been created,
// thus they all successfully pinned a HummockVersion.
// So we can now unpin their epoch.
Expand Down
21 changes: 21 additions & 0 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ impl Query {
})
.collect()
}

pub fn has_lookup_join_stage(&self) -> bool {
self.stage_graph
.stages
.iter()
.any(|(_stage_id, stage_query)| stage_query.has_lookup_join())
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -272,6 +279,7 @@ pub struct QueryStage {
/// Indicates whether this stage contains a table scan node and the table's information if so.
pub table_scan_info: Option<TableScanInfo>,
pub source_info: Option<SourceScanInfo>,
pub has_lookup_join: bool,
}

impl QueryStage {
Expand All @@ -281,6 +289,12 @@ impl QueryStage {
pub fn has_table_scan(&self) -> bool {
self.table_scan_info.is_some()
}

/// If true, this stage contains lookup join executor.
/// We need to delay epoch unpin util the end of the query.
pub fn has_lookup_join(&self) -> bool {
self.has_lookup_join
}
}

impl Debug for QueryStage {
Expand Down Expand Up @@ -320,6 +334,7 @@ struct QueryStageBuilder {
/// See also [`QueryStage::table_scan_info`].
table_scan_info: Option<TableScanInfo>,
source_info: Option<SourceScanInfo>,
has_lookup_join: bool,
}

impl QueryStageBuilder {
Expand All @@ -330,6 +345,7 @@ impl QueryStageBuilder {
exchange_info: ExchangeInfo,
table_scan_info: Option<TableScanInfo>,
source_info: Option<SourceScanInfo>,
has_lookup_join: bool,
) -> Self {
Self {
query_id,
Expand All @@ -340,6 +356,7 @@ impl QueryStageBuilder {
children_stages: vec![],
table_scan_info,
source_info,
has_lookup_join,
}
}

Expand All @@ -352,6 +369,7 @@ impl QueryStageBuilder {
parallelism: self.parallelism,
table_scan_info: self.table_scan_info,
source_info: self.source_info,
has_lookup_join: self.has_lookup_join,
});

stage_graph_builder.add_node(stage.clone());
Expand Down Expand Up @@ -475,6 +493,7 @@ impl BatchPlanFragmenter {
} else {
None
};
let mut has_lookup_join = false;
let parallelism = match root.distribution() {
Distribution::Single => {
if let Some(info) = &mut table_scan_info {
Expand Down Expand Up @@ -515,6 +534,7 @@ impl BatchPlanFragmenter {
} else if let Some(lookup_join_parallelism) =
self.collect_stage_lookup_join_parallelism(root.clone())?
{
has_lookup_join = true;
lookup_join_parallelism
} else if let Some(source_info) = &source_info {
source_info.split_info().len()
Expand All @@ -531,6 +551,7 @@ impl BatchPlanFragmenter {
exchange_info,
table_scan_info,
source_info,
has_lookup_join,
);

self.visit_node(root, &mut builder, None)?;
Expand Down

0 comments on commit fd0c81e

Please sign in to comment.