Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch): release epoch at the end of the distributed query if it contains lookup join #6997

Merged
merged 4 commits into from
Dec 21, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
delay epoch unpin util the end of the distributed query
  • Loading branch information
chenzl25 committed Dec 21, 2022
commit 70d4a7e659fcbef3dc630a5f7b8976ae5a021d03
5 changes: 4 additions & 1 deletion src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
@@ -236,6 +236,7 @@ impl QueryRunner {
);
}
let mut stages_with_table_scan = self.query.stages_with_table_scan();
let has_lookup_join_stage = self.query.has_lookup_join_stage();
// To convince the compiler that `pinned_snapshot` will only be dropped once.
let mut pinned_snapshot_to_drop = Some(pinned_snapshot);
while let Some(msg_inner) = self.msg_receiver.recv().await {
@@ -248,7 +249,9 @@ impl QueryRunner {
);
self.scheduled_stages_count += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add some comments for why remove the early unpin.

stages_with_table_scan.remove(&stage_id);
if stages_with_table_scan.is_empty() {
// If query contains lookup join we need to delay epoch unpin util the end of
// the query.
if !has_lookup_join_stage && stages_with_table_scan.is_empty() {
// We can be sure here that all the Hummock iterators have been created,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be race cases previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks fine. I can't tell.

// thus they all successfully pinned a HummockVersion.
// So we can now unpin their epoch.
21 changes: 21 additions & 0 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
@@ -191,6 +191,13 @@ impl Query {
})
.collect()
}

pub fn has_lookup_join_stage(&self) -> bool {
self.stage_graph
.stages
.iter()
.any(|(_stage_id, stage_query)| stage_query.has_lookup_join())
}
}

#[derive(Clone, Debug)]
@@ -272,6 +279,7 @@ pub struct QueryStage {
/// Indicates whether this stage contains a table scan node and the table's information if so.
pub table_scan_info: Option<TableScanInfo>,
pub source_info: Option<SourceScanInfo>,
pub has_lookup_join: bool,
}

impl QueryStage {
@@ -281,6 +289,12 @@ impl QueryStage {
pub fn has_table_scan(&self) -> bool {
self.table_scan_info.is_some()
}

/// If true, this stage contains lookup join executor.
/// We need to delay epoch unpin util the end of the query.
pub fn has_lookup_join(&self) -> bool {
self.has_lookup_join
}
}

impl Debug for QueryStage {
@@ -320,6 +334,7 @@ struct QueryStageBuilder {
/// See also [`QueryStage::table_scan_info`].
table_scan_info: Option<TableScanInfo>,
source_info: Option<SourceScanInfo>,
has_lookup_join: bool,
}

impl QueryStageBuilder {
@@ -330,6 +345,7 @@ impl QueryStageBuilder {
exchange_info: ExchangeInfo,
table_scan_info: Option<TableScanInfo>,
source_info: Option<SourceScanInfo>,
has_lookup_join: bool,
) -> Self {
Self {
query_id,
@@ -340,6 +356,7 @@ impl QueryStageBuilder {
children_stages: vec![],
table_scan_info,
source_info,
has_lookup_join,
}
}

@@ -352,6 +369,7 @@ impl QueryStageBuilder {
parallelism: self.parallelism,
table_scan_info: self.table_scan_info,
source_info: self.source_info,
has_lookup_join: self.has_lookup_join,
});

stage_graph_builder.add_node(stage.clone());
@@ -475,6 +493,7 @@ impl BatchPlanFragmenter {
} else {
None
};
let mut has_lookup_join = false;
let parallelism = match root.distribution() {
Distribution::Single => {
if let Some(info) = &mut table_scan_info {
@@ -515,6 +534,7 @@ impl BatchPlanFragmenter {
} else if let Some(lookup_join_parallelism) =
self.collect_stage_lookup_join_parallelism(root.clone())?
{
has_lookup_join = true;
lookup_join_parallelism
} else if let Some(source_info) = &source_info {
source_info.split_info().len()
@@ -531,6 +551,7 @@ impl BatchPlanFragmenter {
exchange_info,
table_scan_info,
source_info,
has_lookup_join,
);

self.visit_node(root, &mut builder, None)?;