Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

panicked at vnode 151 should not be accessed by this table #7698

Closed
Tracked by #6640
lmatz opened this issue Feb 4, 2023 · 3 comments · Fixed by #8404
Closed
Tracked by #6640

panicked at vnode 151 should not be accessed by this table #7698

lmatz opened this issue Feb 4, 2023 · 3 comments · Fixed by #8404
Assignees
Labels
priority/high type/bug Something isn't working
Milestone

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 4, 2023

Describe the bug

create table t (src int, dst int);

create materialized view cycle_3 as
select t1.src p1, t1.dst p2, t2.dst p3 from
t t1 join t t2 join t t3 where t1.dst = t2.src and t2.src = t3.dst and t3.dst = t1.src;

insert into t values (1, 2), (2, 1);
flush;
 StreamMaterialize { columns: [p1, p2, p3, t._row_id(hidden), t._row_id#1(hidden), t.src(hidden), t._row_id#2(hidden), t.dst(hidden)], pk_columns: [t._row_id, t._row_id#1, p2, t.src, t._row_id#2, t.dst, p1] }
 └─StreamHashJoin { type: Inner, predicate: t.src = t.dst AND t.src = t.dst }
   ├─StreamExchange { dist: HashShard(t.src) }
   | └─StreamHashJoin { type: Inner, predicate: t.dst = t.src }
   |   ├─StreamExchange { dist: HashShard(t.dst) }
   |   | └─StreamTableScan { table: t, columns: [src, dst, _row_id] }
   |   └─StreamExchange { dist: HashShard(t.src) }
   |     └─StreamTableScan { table: t, columns: [src, dst, _row_id] }
   └─StreamExchange { dist: HashShard(t.dst) }
     └─StreamTableScan { table: t, columns: [dst, _row_id] }
(10 rows)

To Reproduce

No response

Expected behavior

No response

Additional context

No response

@lmatz lmatz added the type/bug Something isn't working label Feb 4, 2023
@github-actions github-actions bot added this to the release-0.1.17 milestone Feb 4, 2023
@lmatz
Copy link
Contributor Author

lmatz commented Feb 7, 2023

We remark that this query is used to detect cycles.
Although we don't support recursive cte to detect cycles of indeterministic length, the workaround is to write N independent queries to detect cycles of length from 2 to N+1.

In real-world use cases, N is likely to be bounded.

@st1page
Copy link
Contributor

st1page commented Feb 9, 2023

dev=> create table t (a int, b int);

create materialized view mv as
select t1.a a, t2.a aa, t3.a aaa, t1.b b, t2.b bb, t3.b bbb from
t t1 join t t2 join t t3 where t1.b = t2.a and t2.a = t3.b and t3.b = t1.a;

The query panic too.
It is very strange, I have done some tests and thought I think everything is right so far. I will continue debugging

  • Check the plan and I think it is right
 Fragment 0
   StreamMaterialize { columns: [a, aa, aaa, b, bb, bbb, t._row_id(hidden), t._row_id#1(hidden), t._row_id#2(hidden)], pk_columns: [t._row_id, t._row_id#1, b, aa, t._row_id#2, bbb, a] }
       materialized table: 4294967294
       Output: [t.a, t.a, t.a, t.b, t.b, t.b, t._row_id, t._row_id, t._row_id]
       Stream key: [t._row_id, t._row_id, t.b, t.a, t._row_id, t.b, t.a], 
     StreamHashJoin { type: Inner, predicate: t.a$2 = t.b$7 AND t.a$0 = t.b$7, output: [t.a, t.a, t.a, t.b, t.b, t.b, t._row_id, t._row_id, t._row_id] }
         left table: 0, right table 2, left degree table: 1, right degree table: 3,
         Output: [t.a, t.a, t.a, t.b, t.b, t.b, t._row_id, t._row_id, t._row_id]
         Stream key: [t._row_id, t._row_id, t.b, t.a, t._row_id, t.b, t.a], 
       StreamExchange Hash([0]) from 1
           Output: [t.a, t.b, t.a, t.b, t._row_id, t._row_id]
           Stream key: [t._row_id, t._row_id, t.b, t.a], 
       StreamExchange Hash([1]) from 4
           Output: [t.a, t.b, t._row_id]
           Stream key: [t._row_id], 
 
 Fragment 1
   StreamHashJoin { type: Inner, predicate: t.b$1 = t.a$3, output: [t.a, t.b, t.a, t.b, t._row_id, t._row_id] }
       left table: 4, right table 6, left degree table: 5, right degree table: 7,
       Output: [t.a, t.b, t.a, t.b, t._row_id, t._row_id]
       Stream key: [t._row_id, t._row_id, t.b, t.a], 
     StreamExchange Hash([1]) from 2
         Output: [t.a, t.b, t._row_id]
         Stream key: [t._row_id], 
     StreamExchange Hash([0]) from 3
         Output: [t.a, t.b, t._row_id]
         Stream key: [t._row_id], 
 
 Fragment 2
   Chain { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id$2) }
       Output: [t.a, t.b, t._row_id]
       Stream key: [t._row_id], 
     Upstream
         Output: [a, b, _row_id]
         Stream key: [_row_id], 
     BatchPlanNode
         Output: [t.a, t.b, t._row_id]
         Stream key: [t._row_id], AppendOnly
 
 Fragment 3
   Chain { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id$2) }
       Output: [t.a, t.b, t._row_id]
       Stream key: [t._row_id], 
     Upstream
         Output: [a, b, _row_id]
         Stream key: [_row_id], 
     BatchPlanNode
         Output: [t.a, t.b, t._row_id]
         Stream key: [t._row_id], AppendOnly
 
 Fragment 4
   Chain { table: t, columns: [t.a, t.b, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id$2) }
       Output: [t.a, t.b, t._row_id]
       Stream key: [t._row_id], 
     Upstream
         Output: [a, b, _row_id]
         Stream key: [_row_id], 
     BatchPlanNode
         Output: [t.a, t.b, t._row_id]
         Stream key: [t._row_id], AppendOnly
 
  Table 0 { columns: [t_a:integer, t_b:integer, t_a_0:integer, t_b_0:integer, t__row_id:bigint, t__row_id_0:bigint], primary key: [$2 ASC, $0 ASC, $4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [0] }
  Table 1 { columns: [t_a:integer, t_a_0:integer, t__row_id:bigint, t__row_id_0:bigint, t_b:integer, _degree:bigint], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC], value indices: [5], distribution key: [1] }
  Table 2 { columns: [t_a:integer, t_b:integer, t__row_id:bigint], primary key: [$1 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1] }
  Table 3 { columns: [t_b:integer, t_b_0:integer, t__row_id:bigint, _degree:bigint], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0] }
  Table 4 { columns: [t_a:integer, t_b:integer, t__row_id:bigint], primary key: [$1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1] }
  Table 5 { columns: [t_b:integer, t__row_id:bigint, _degree:bigint], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
  Table 6 { columns: [t_a:integer, t_b:integer, t__row_id:bigint], primary key: [$0 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0] }
  Table 7 { columns: [t_a:integer, t__row_id:bigint, _degree:bigint], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
  Table 4294967294 { columns: [a:integer, aa:integer, aaa:integer, b:integer, bb:integer, bbb:integer, t._row_id:bigint, t._row_id#1:bigint, t._row_id#2:bigint], primary key: [$6 ASC, $7 ASC, $3 ASC, $1 ASC, $8 ASC, $5 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8], distribution key: [0] }
(70 rows)
  • the vnode mapping in the state table is right
    assert!(
        is_set,
        "vnode {} should not be accessed by this table, table's vnodes are {:?}, actor is {:#?}",
        vnode,
        vnodes,
        async_stack_trace::current_context()
    );
thread 'risingwave-streaming-actor' panicked at 'vnode 151 should not be accessed by this table, table's vnodes are 0000000000000000000000000000000000000000000000000000000000000000111111111111111111111111111111111111111111111111111111111111111100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000, actor is Some(
    "Actor 10: `CREATE MATERIALIZED VIEW mv AS SELECT t1.a AS a, t2.a AS aa, t3.a AS aaa, t1.b AS b, t2.b AS bb, t3.b AS bbb FROM t AS t1 JOIN t AS t2 JOIN t AS t3 WHERE t1.b = t2.a AND t2.a = t3.b AND t3.b = t1.a` [83.335397ms]\n  Epoch 3840550190710784 [53.334653ms]\n    MaterializeExecutor A00000077 (actor 10, executor 119) [53.334653ms]\n      HashJoinExecutor A00000076 (actor 10, executor 118) [53.334653ms]  <== current\n[Detached 4]\n  MergeExecutor A0000006B (actor 10, executor 107) [3.333415ms]\n",
)', src/storage/src/table/mod.rs:152:5

image

(The fragment has actor 9,10,11,12, and the panic actor is the second actor 10 but get the third executor's data)

@BugenZhao
Copy link
Member

BugenZhao commented Feb 9, 2023

Here's a more readable plan.

StreamMaterialize { columns: [p1, p2, p3, t1.t._row_id(hidden), t2.t._row_id(hidden), t2.src(hidden), t3.t._row_id(hidden), t3.dst(hidden)], pk_columns: [t1.t._row_id, t2.t._row_id, p2, t2.src, t3.t._row_id, t3.dst, p1] }
 └─StreamHashJoin { type: Inner, predicate: t2.src = t3.dst AND t1.src = t3.dst, output: [t1.src, t1.dst, t2.dst, t1.t._row_id, t2.t._row_id, t2.src, t3.t._row_id, t3.dst] }
   ├─StreamExchange { dist: HashShard(t1.src) }
   | └─StreamHashJoin { type: Inner, predicate: t1.dst = t2.src, output: [t1.src, t1.dst, t2.src, t2.dst, t1.t._row_id, t2.t._row_id] }
   |   ├─StreamExchange { dist: HashShard(t1.dst) }
   |   | └─StreamTableScan { table: t1, columns: [t1.src, t1.dst, t1.t._row_id], pk: [t1.t._row_id], dist: UpstreamHashShard(t1.t._row_id) }
   |   └─StreamExchange { dist: HashShard(t2.src) }
   |     └─StreamTableScan { table: t2, columns: [t2.src, t2.dst, t2.t._row_id], pk: [t2.t._row_id], dist: UpstreamHashShard(t2.t._row_id) }
   └─StreamExchange { dist: HashShard(t3.dst) }
     └─StreamTableScan { table: t3, columns: [t3.dst, t3.t._row_id], pk: [t3.t._row_id], dist: UpstreamHashShard(t3.t._row_id) }
(10 rows)

Let's focus on the most downstream Join.

Its join key is

  • t2.src, t1.src on the left side
  • t3.dst, t3.dst on the right side

Its distribution key is

  • t1.src on the left side / join table
  • t3.dst on the right side / join table

Its primary key is

  • t2.src, t1.src, xx.row_id, xx.row_id, t1.dst on the left table
  • t3.dst, t3.dst (!!), xx.row_id on the right table.

When the left side comes a row of

[t1.src, t1.dst, t2.src, t2.dst, t1.t._row_id, t2.t._row_id]
[2     , 1     , 1     , 2     , ..          , ..          ]

We need to look up the join key [t2.src, t1.src] = [1, 2] in the right table, so need to calculate the vnode based on this pk prefix.

Here comes the problem, there're repeated t3.dst in the pk of the right table. So when we want to find out which column in the pk is the dist-key, should we answer first or second? 😄

As above, the two join sides are distributed by t1.src and t3.dst, the second datum. So we should take second here as the distribution key. However, the right table assumes that each (valid) access should have these two datums the same, so it simply takes the first one by short-circuit. So we're calculating the partition based on a wrong key. This leads to partition assertion failure.

pub fn get_dist_key_in_pk_indices(dist_key_indices: &[usize], pk_indices: &[usize]) -> Vec<usize> {
let dist_key_in_pk_indices = dist_key_indices
.iter()
.map(|&di| {
pk_indices
.iter()
.position(|&pi| di == pi)
.unwrap_or_else(|| {
panic!(
"distribution key {:?} must be a subset of primary key {:?}",
dist_key_indices, pk_indices
)
})
})
.collect_vec();
dist_key_in_pk_indices
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority/high type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants