Skip to content

Commit

Permalink
refactor(fragmenter): remove is_singleton workarounds on Chain (r…
Browse files Browse the repository at this point in the history
…isingwavelabs#8536)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 15, 2023
1 parent 9f68cef commit 1449439
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 59 deletions.
26 changes: 10 additions & 16 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 4 additions & 9 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,6 @@ enum ChainType {
// 1. MergeNode (as a placeholder) for streaming read.
// 2. BatchPlanNode for snapshot read.
message ChainNode {
reserved 5;
reserved "same_worker_node";

uint32 table_id = 1;
// The schema of input stream, which will be used to build a MergeNode
repeated plan_common.Field upstream_fields = 2;
Expand All @@ -400,10 +397,6 @@ message ChainNode {
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
// ChainType is used to decide which implementation for the ChainNode.
ChainType chain_type = 4;
// Whether the upstream materialize is and this chain should be a singleton.
// FIXME: This is a workaround for fragmenter since the distribution info will be lost if there's only one
// fragment in the downstream mview. Remove this when we refactor the fragmenter.
bool is_singleton = 6;

// The upstream materialized view info used by backfill.
plan_common.StorageTableDesc table_desc = 7;
Expand Down Expand Up @@ -653,8 +646,10 @@ message StreamFragmentGraph {
StreamNode node = 2;
// Bitwise-OR of FragmentTypeFlags
uint32 fragment_type_mask = 3;
// mark whether this fragment should only have one actor.
bool is_singleton = 4;
// Mark whether this fragment requires exactly one actor.
// Note: if this is `false`, the fragment may still be a singleton according to the scheduler.
// One should check `meta.Fragment.distribution_type` for the final result.
bool requires_singleton = 4;
// Number of table ids (stateful states) for this fragment.
uint32 table_ids_cnt = 5;
// Mark the upstream table ids of this fragment, Used for fragments with `Chain`s.
Expand Down
9 changes: 5 additions & 4 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {

/// Transform this vnode mapping to another type of vnode mapping, with the given mapping from
/// items of this mapping to items of the other mapping.
pub fn transform<T2: VnodeMappingItem>(
&self,
to_map: &HashMap<T::Item, T2::Item>,
) -> VnodeMapping<T2> {
pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
where
T2: VnodeMappingItem,
M: for<'a> Index<&'a T::Item, Output = T2::Item>,
{
VnodeMapping {
original_indices: self.original_indices.clone(),
data: self.data.iter().map(|item| to_map[item]).collect(),
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ impl StreamIndexScan {
.map(|&i| i as _)
.collect(),
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
is_singleton: false,
table_desc: Some(self.logical.table_desc().to_protobuf()),
})),
stream_key,
Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ impl StreamTableScan {
.map(|&i| i as _)
.collect(),
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
is_singleton: *self.distribution() == Distribution::Single,
// The table desc used by backfill executor
table_desc: Some(self.logical.table_desc().to_protobuf()),
})),
Expand Down
9 changes: 4 additions & 5 deletions src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct StreamFragment {
/// Bitwise-OR of type Flags of this fragment.
pub fragment_type_mask: u32,

/// mark whether this fragment should only have one actor.
pub is_singleton: bool,
/// Mark whether this fragment requires exactly one actor.
pub requires_singleton: bool,

/// Number of table ids (stateful states) for this fragment.
pub table_ids_cnt: u32,
Expand All @@ -64,8 +64,7 @@ impl StreamFragment {
Self {
fragment_id,
fragment_type_mask: FragmentTypeFlag::FragmentUnspecified as u32,
// FIXME: is it okay to use `false` as default value?
is_singleton: false,
requires_singleton: false,
node: None,
table_ids_cnt: 0,
upstream_table_ids: vec![],
Expand All @@ -77,7 +76,7 @@ impl StreamFragment {
fragment_id: self.fragment_id,
node: self.node.clone().map(|n| *n),
fragment_type_mask: self.fragment_type_mask,
is_singleton: self.is_singleton,
requires_singleton: self.requires_singleton,
table_ids_cnt: self.table_ids_cnt,
upstream_table_ids: self.upstream_table_ids.clone(),
}
Expand Down
15 changes: 3 additions & 12 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,7 @@ pub(self) fn build_and_add_fragment(
}

/// Build new fragment and link dependencies by visiting children recursively, update
/// `is_singleton` and `fragment_type` properties for current fragment. While traversing the
/// tree, count how many table ids should be allocated in this fragment.
// TODO: Should we store the concurrency in StreamFragment directly?
/// `requires_singleton` and `fragment_type` properties for current fragment.
fn build_fragment(
state: &mut BuildFragmentGraphState,
current_fragment: &mut StreamFragment,
Expand All @@ -248,8 +246,7 @@ fn build_fragment(

NodeBody::Sink(_) => current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32,

// TODO: Force singleton for TopN as a workaround. We should implement two phase TopN.
NodeBody::TopN(_) => current_fragment.is_singleton = true,
NodeBody::TopN(_) => current_fragment.requires_singleton = true,

// FIXME: workaround for single-fragment mview on singleton upstream mview.
NodeBody::Chain(node) => {
Expand All @@ -259,12 +256,11 @@ fn build_fragment(
.dependent_relation_ids
.insert(TableId::new(node.table_id));
current_fragment.upstream_table_ids.push(node.table_id);
current_fragment.is_singleton = node.is_singleton;
}

NodeBody::Now(_) => {
current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
current_fragment.is_singleton = true;
current_fragment.requires_singleton = true;
}

_ => {}
Expand Down Expand Up @@ -293,8 +289,6 @@ fn build_fragment(
// Exchange node indicates a new child fragment.
NodeBody::Exchange(exchange_node) => {
let exchange_node_strategy = exchange_node.get_strategy()?.clone();
let is_simple_dispatcher =
exchange_node_strategy.get_type()? == DispatcherType::Simple;

// Exchange node should have only one input.
let [input]: [_; 1] = std::mem::take(&mut child_node.input).try_into().unwrap();
Expand All @@ -308,9 +302,6 @@ fn build_fragment(
},
);

if is_simple_dispatcher {
current_fragment.is_singleton = true;
}
Ok(child_node)
}

Expand Down
7 changes: 1 addition & 6 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{generate_internal_table_name_with_type, TableId};
use risingwave_pb::catalog::Table;
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::stream_fragment_graph::{
Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
Expand Down Expand Up @@ -604,11 +603,7 @@ impl CompleteStreamFragmentGraph {
table_id,
} = self.get_fragment(id).into_building().unwrap();

let distribution_type = if inner.is_singleton {
FragmentDistributionType::Single
} else {
FragmentDistributionType::Hash
} as i32;
let distribution_type = distribution.to_distribution_type() as i32;

let state_table_ids = internal_tables
.iter()
Expand Down
14 changes: 12 additions & 2 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use rand::thread_rng;
use risingwave_common::bail;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
use risingwave_pb::common::{ActorInfo, ParallelUnit};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::fragment::{
FragmentDistributionType, PbFragmentDistributionType,
};
use risingwave_pb::stream_plan::DispatcherType::{self, *};

use crate::manager::{WorkerId, WorkerLocations};
Expand Down Expand Up @@ -170,6 +172,14 @@ impl Distribution {
FragmentDistributionType::Hash => Distribution::Hash(mapping),
}
}

/// Convert the distribution to [`PbFragmentDistributionType`].
pub fn to_distribution_type(&self) -> PbFragmentDistributionType {
match self {
Distribution::Singleton(_) => PbFragmentDistributionType::Single,
Distribution::Hash(_) => PbFragmentDistributionType::Hash,
}
}
}

/// [`Scheduler`] schedules the distribution of fragments in a stream graph.
Expand Down Expand Up @@ -272,7 +282,7 @@ impl Scheduler {
// Building fragments and Singletons
for (&id, fragment) in graph.building_fragments() {
facts.push(Fact::Fragment(id));
if fragment.is_singleton {
if fragment.requires_singleton {
facts.push(Fact::SingletonReq(id));
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
fragment_id: 2,
node: Some(source_node),
fragment_type_mask: FragmentTypeFlag::Source as u32,
is_singleton: false,
requires_singleton: false,
table_ids_cnt: 0,
upstream_table_ids: vec![],
});
Expand Down Expand Up @@ -280,7 +280,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
fragment_id: 1,
node: Some(simple_agg_node),
fragment_type_mask: FragmentTypeFlag::FragmentUnspecified as u32,
is_singleton: false,
requires_singleton: false,
table_ids_cnt: 0,
upstream_table_ids: vec![],
});
Expand Down Expand Up @@ -368,7 +368,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
fragment_id: 0,
node: Some(mview_node),
fragment_type_mask: FragmentTypeFlag::Mview as u32,
is_singleton: true,
requires_singleton: true,
table_ids_cnt: 0,
upstream_table_ids: vec![],
});
Expand Down

0 comments on commit 1449439

Please sign in to comment.