From f98c5db391353897391645a41d5a5f0e2df441eb Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Thu, 16 Mar 2023 18:24:55 +0800 Subject: [PATCH 1/4] feat(source): store source split state as jsonb --- Cargo.lock | 1 + src/common/src/array/jsonb_array.rs | 10 +++++ src/connector/src/macros.rs | 29 +++++++++++++-- src/connector/src/source/base.rs | 37 +++++++++++++++++-- src/connector/src/source/cdc/split.rs | 10 ++--- src/connector/src/source/datagen/split.rs | 10 ++--- .../src/source/filesystem/file_common.rs | 9 +++-- .../src/source/google_pubsub/split.rs | 10 ++--- src/connector/src/source/kafka/split.rs | 10 ++--- src/connector/src/source/kinesis/split.rs | 10 ++--- src/connector/src/source/nexmark/split.rs | 10 ++--- src/connector/src/source/pulsar/split.rs | 10 ++--- .../src/optimizer/plan_node/generic/source.rs | 4 +- src/meta/src/stream/source_manager.rs | 10 ++--- src/storage/hummock_sdk/src/key.rs | 2 +- src/stream/Cargo.toml | 1 + .../executor/source/state_table_handler.rs | 26 +++++++------ 17 files changed, 133 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfaab8a21fa7..8674b888b0e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6658,6 +6658,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "serde_json", "smallvec", "static_assertions", "task_stats_alloc", diff --git a/src/common/src/array/jsonb_array.rs b/src/common/src/array/jsonb_array.rs index 5169fb480f15..ad393bc59d0a 100644 --- a/src/common/src/array/jsonb_array.rs +++ b/src/common/src/array/jsonb_array.rs @@ -164,6 +164,16 @@ impl JsonbVal { let v = Value::from_sql(&Type::JSONB, buf).ok()?; Some(Self(v.into())) } + + pub fn take(mut self) -> Value { + self.0.take() + } +} + +impl From for JsonbVal { + fn from(v: Value) -> Self { + Self(v.into()) + } } impl JsonbRef<'_> { diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 07853507bbe3..cd1e44df821e 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -76,12 +76,18 @@ macro_rules! impl_split { } } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(ConnectorSplit::from(self).encode_to_vec()) + fn encode_to_json(&self) -> JsonbVal { + use serde_json::json; + let inner = self.encode_to_json_inner().take(); + json!({ SPLIT_TYPE_FIELD: self.get_type(), SPLIT_INFO_FIELD: inner}).into() } - fn restore_from_bytes(bytes: &[u8]) -> Result { - SplitImpl::try_from(&ConnectorSplit::decode(bytes)?) + fn restore_from_json(value: JsonbVal) -> Result { + let mut value = value.take(); + let json_obj = value.as_object_mut().unwrap(); + let split_type = json_obj.remove(SPLIT_TYPE_FIELD).unwrap().as_str().unwrap().to_string(); + let inner_value = json_obj.remove(SPLIT_INFO_FIELD).unwrap(); + Self::restore_from_json_inner(&split_type, inner_value.into()) } } @@ -98,6 +104,21 @@ macro_rules! impl_split { $( Self::$variant_name(inner) => Self::$variant_name(inner.copy_with_offset(start_offset)), )* } } + + pub fn encode_to_json_inner(&self) -> JsonbVal { + match self { + $( Self::$variant_name(inner) => inner.encode_to_json(), )* + } + } + + fn restore_from_json_inner(split_type: &str, value: JsonbVal) -> Result { + match split_type.to_lowercase().as_str() { + $( $connector_name => <$split>::restore_from_json(value).map(SplitImpl::$variant_name), )* + other => { + Err(anyhow!("connector '{}' is not supported", other)) + } + } + } } } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 14f68f386782..725518de9103 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -22,10 +22,10 @@ use enum_as_inner::EnumAsInner; use futures::stream::BoxStream; use itertools::Itertools; use parking_lot::Mutex; -use prost::Message; -use risingwave_common::array::StreamChunk; +use risingwave_common::array::{JsonbVal, StreamChunk}; use risingwave_common::catalog::TableId; use risingwave_common::error::{ErrorCode, ErrorSuppressor, Result as RwResult, RwError}; +use risingwave_common::types::Scalar; use risingwave_pb::connector_service::TableSchema; use risingwave_pb::source::ConnectorSplit; use serde::{Deserialize, Serialize}; @@ -66,6 +66,9 @@ use crate::source::pulsar::{ }; use crate::{impl_connector_properties, impl_split, impl_split_enumerator, impl_split_reader}; +const SPLIT_TYPE_FIELD: &str = "split_type"; +const SPLIT_INFO_FIELD: &str = "split_info"; + /// [`SplitEnumerator`] fetches the split metadata from the external source service. /// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate. #[async_trait] @@ -403,8 +406,18 @@ impl Eq for SourceMessage {} /// The metadata of a split. pub trait SplitMetaData: Sized { fn id(&self) -> SplitId; - fn encode_to_bytes(&self) -> Bytes; - fn restore_from_bytes(bytes: &[u8]) -> Result; + fn encode_to_bytes(&self) -> Bytes { + self.encode_to_json() + .as_scalar_ref() + .value_serialize() + .into() + } + fn restore_from_bytes(bytes: &[u8]) -> Result { + Self::restore_from_json(JsonbVal::value_deserialize(bytes).unwrap()) + } + + fn encode_to_json(&self) -> JsonbVal; + fn restore_from_json(value: JsonbVal) -> Result; } /// [`ConnectorState`] maintains the consuming splits' info. In specific split readers, @@ -427,6 +440,7 @@ mod tests { let get_value = split_impl.into_kafka().unwrap(); println!("{:?}", get_value); assert_eq!(split.encode_to_bytes(), get_value.encode_to_bytes()); + assert_eq!(split.encode_to_json(), get_value.encode_to_json()); Ok(()) } @@ -441,6 +455,21 @@ mod tests { split_impl.encode_to_bytes(), restored_split_impl.encode_to_bytes() ); + assert_eq!( + split_impl.encode_to_json(), + restored_split_impl.encode_to_json() + ); + + let encoded_split = split_impl.encode_to_json(); + let restored_split_impl = SplitImpl::restore_from_json(encoded_split)?; + assert_eq!( + split_impl.encode_to_bytes(), + restored_split_impl.encode_to_bytes() + ); + assert_eq!( + split_impl.encode_to_json(), + restored_split_impl.encode_to_json() + ); Ok(()) } diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index f92a4bc67497..7f74c9527b4f 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -31,12 +31,12 @@ impl SplitMetaData for CdcSplit { format!("{}", self.source_id).into() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/connector/src/source/datagen/split.rs b/src/connector/src/source/datagen/split.rs index 61b8c266a537..00f5fa8abc2f 100644 --- a/src/connector/src/source/datagen/split.rs +++ b/src/connector/src/source/datagen/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::base::SplitMetaData; @@ -32,12 +32,12 @@ impl SplitMetaData for DatagenSplit { format!("{}-{}", self.split_num, self.split_index).into() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index 2647f3497a89..7b5604c5a0c1 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use anyhow::anyhow; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -30,12 +31,12 @@ impl SplitMetaData for FsSplit { self.name.as_str().into() } - fn encode_to_bytes(&self) -> bytes::Bytes { - bytes::Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index 359aa8f48f64..3b8ddd07c29f 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -47,12 +47,12 @@ impl PubsubSplit { } impl SplitMetaData for PubsubSplit { - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } fn id(&self) -> SplitId { diff --git a/src/connector/src/source/kafka/split.rs b/src/connector/src/source/kafka/split.rs index 4fdb5a71f04f..9543dce60a41 100644 --- a/src/connector/src/source/kafka/split.rs +++ b/src/connector/src/source/kafka/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -32,12 +32,12 @@ impl SplitMetaData for KafkaSplit { format!("{}", self.partition).into() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/connector/src/source/kinesis/split.rs b/src/connector/src/source/kinesis/split.rs index 941c3edfe903..052f1709de36 100644 --- a/src/connector/src/source/kinesis/split.rs +++ b/src/connector/src/source/kinesis/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -39,12 +39,12 @@ impl SplitMetaData for KinesisSplit { self.shard_id.clone() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/connector/src/source/nexmark/split.rs b/src/connector/src/source/nexmark/split.rs index 9bb765a8acf7..9d6c4059dc5b 100644 --- a/src/connector/src/source/nexmark/split.rs +++ b/src/connector/src/source/nexmark/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; @@ -31,12 +31,12 @@ impl SplitMetaData for NexmarkSplit { format!("{}-{}", self.split_num, self.split_index).into() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/connector/src/source/pulsar/split.rs b/src/connector/src/source/pulsar/split.rs index d97508d6c581..62ccb81e08b0 100644 --- a/src/connector/src/source/pulsar/split.rs +++ b/src/connector/src/source/pulsar/split.rs @@ -13,7 +13,7 @@ // limitations under the License. use anyhow::anyhow; -use bytes::Bytes; +use risingwave_common::array::JsonbVal; use serde::{Deserialize, Serialize}; use crate::source::pulsar::topic::Topic; @@ -46,11 +46,11 @@ impl SplitMetaData for PulsarSplit { self.topic.to_string().into() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(self.clone()).unwrap().into() } } diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 9cf55c4de13c..b1beb588e833 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -96,8 +96,8 @@ impl Source { type_name: "".to_string(), }; let value = Field { - data_type: DataType::Bytea, - name: "offset".to_string(), + data_type: DataType::Jsonb, + name: "offset_info".to_string(), sub_fields: vec![], type_name: "".to_string(), }; diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 0469768b5fd1..fc5b5bdacd02 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -694,7 +694,7 @@ mod tests { use std::collections::{BTreeMap, HashMap, HashSet}; use anyhow::anyhow; - use bytes::Bytes; + use risingwave_common::array::JsonbVal; use risingwave_connector::source::{SplitId, SplitMetaData}; use serde::{Deserialize, Serialize}; @@ -711,12 +711,12 @@ mod tests { format!("{}", self.id).into() } - fn encode_to_bytes(&self) -> Bytes { - Bytes::from(serde_json::to_string(self).unwrap()) + fn encode_to_json(&self) -> JsonbVal { + serde_json::to_value(*self).unwrap().into() } - fn restore_from_bytes(bytes: &[u8]) -> anyhow::Result { - serde_json::from_slice(bytes).map_err(|e| anyhow!(e)) + fn restore_from_json(value: JsonbVal) -> anyhow::Result { + serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } } diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index bf06dd477277..41086f90222b 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -624,7 +624,7 @@ impl<'a> FullKey<&'a [u8]> { } } - /// Construct a [`FullKey`] from a byte slice without `table_id` encoded. + /// Construct a [`FullKey`] from a byte slice without `table_id` encoded. pub fn from_slice_without_table_id( table_id: TableId, slice_without_table_id: &'a [u8], diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 1c6d587db484..7bba32e665a0 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -52,6 +52,7 @@ risingwave_pb = { path = "../prost" } risingwave_rpc_client = { path = "../rpc_client" } risingwave_source = { path = "../source" } risingwave_storage = { path = "../storage" } +serde_json = "1" smallvec = "1" static_assertions = "1" task_stats_alloc = { path = "../utils/task_stats_alloc" } diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 0ceae9e36a6f..c2c7fdaf7e28 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -15,13 +15,13 @@ use std::collections::HashSet; use std::ops::{Bound, Deref}; -use bytes::Bytes; use futures::{pin_mut, StreamExt}; +use risingwave_common::array::JsonbVal; use risingwave_common::catalog::{DatabaseId, SchemaId}; use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{ScalarImpl, ScalarRefImpl}; +use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::{bail, row}; use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; @@ -96,8 +96,8 @@ impl SourceStateTableHandler { pin_mut!(iter); while let Some(row) = iter.next().await { let row = row?; - if let Some(ScalarRefImpl::Bytea(bytes)) = row.datum_at(1) { - let split = SplitImpl::restore_from_bytes(bytes)?; + if let Some(ScalarRefImpl::Jsonb(jsonb_ref)) = row.datum_at(1) { + let split = SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?; let fs = split .as_fs() .unwrap_or_else(|| panic!("split {:?} is not fs", split)); @@ -110,14 +110,14 @@ impl SourceStateTableHandler { Ok(set) } - async fn set_complete(&mut self, key: SplitId, value: Bytes) -> StreamExecutorResult<()> { + async fn set_complete(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> { let row = [ Some(Self::string_to_scalar(format!( "{}{}", COMPLETE_SPLIT_PREFIX, key.deref() ))), - Some(ScalarImpl::Bytea(Box::from(value.as_ref()))), + Some(ScalarImpl::Jsonb(value)), ]; if let Some(prev_row) = self.get(key).await? { self.state_store.delete(prev_row); @@ -137,17 +137,17 @@ impl SourceStateTableHandler { bail!("states require not null"); } else { for split in states { - self.set_complete(split.id(), split.encode_to_bytes()) + self.set_complete(split.id(), split.encode_to_json()) .await?; } } Ok(()) } - async fn set(&mut self, key: SplitId, value: Bytes) -> StreamExecutorResult<()> { + async fn set(&mut self, key: SplitId, value: JsonbVal) -> StreamExecutorResult<()> { let row = [ Some(Self::string_to_scalar(key.deref())), - Some(ScalarImpl::Bytea(Vec::from(value).into_boxed_slice())), + Some(ScalarImpl::Jsonb(value)), ]; match self.get(key).await? { Some(prev_row) => { @@ -173,7 +173,7 @@ impl SourceStateTableHandler { bail!("states require not null"); } else { for split_impl in states { - self.set(split_impl.id(), split_impl.encode_to_bytes()) + self.set(split_impl.id(), split_impl.encode_to_json()) .await?; } } @@ -188,7 +188,9 @@ impl SourceStateTableHandler { Ok(match self.get(stream_source_split.id()).await? { None => None, Some(row) => match row.datum_at(1) { - Some(ScalarRefImpl::Bytea(bytes)) => Some(SplitImpl::restore_from_bytes(bytes)?), + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { + Some(SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?) + } _ => unreachable!(), }, }) @@ -280,6 +282,7 @@ pub(crate) mod tests { .await; let split_impl = SplitImpl::Kafka(KafkaSplit::new(0, Some(0), None, "test".into())); let serialized = split_impl.encode_to_bytes(); + let serialized_json = split_impl.encode_to_json(); let epoch_1 = EpochPair::new_test_epoch(1); let epoch_2 = EpochPair::new_test_epoch(2); @@ -299,6 +302,7 @@ pub(crate) mod tests { { Some(s) => { assert_eq!(s.encode_to_bytes(), serialized); + assert_eq!(s.encode_to_json(), serialized_json); } None => unreachable!(), } From c70496dae1b514b54f3d30c21fd04d57f937278d Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Thu, 16 Mar 2023 19:37:16 +0800 Subject: [PATCH 2/4] fix tests --- .../tests/testdata/nexmark_source.yaml | 68 +++++++++---------- .../tests/testdata/watermark.yaml | 2 +- .../executor/source/state_table_handler.rs | 2 +- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index 6d35aa31b6d0..9e2b78cf7450 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -68,7 +68,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 0 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, date_time, _row_id], primary key: [$4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [4], read pk prefix len hint: 1 } - id: nexmark_q1 before: @@ -102,7 +102,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 0 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, date_time, _row_id], primary key: [$4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [4], read pk prefix len hint: 1 } - id: nexmark_q2 before: @@ -133,7 +133,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 0 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, price, _row_id], primary key: [$2 ASC], value indices: [0, 1, 2], distribution key: [2], read pk prefix len hint: 1 } - id: nexmark_q3 before: @@ -196,8 +196,8 @@ Table 1 { columns: [seller, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [id, name, city, state, _row_id], primary key: [$0 ASC, $4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 4 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [name, city, state, id, _row_id, seller, _row_id#1], primary key: [$4 ASC, $6 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [5], read pk prefix len hint: 3 } - id: nexmark_q4 before: @@ -282,8 +282,8 @@ Table 3 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [auction, price, date_time, _row_id], primary key: [$0 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [0], read pk prefix len hint: 1 } Table 5 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 6 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 7 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [category, avg], primary key: [$0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } - id: nexmark_q5 before: @@ -404,7 +404,7 @@ Table 2 { columns: [max(count), window_start], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 } Table 3 { columns: [window_start, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [auction, window_start, count], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0, 1], read pk prefix len hint: 2 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 6 { columns: [window_start, count, auction], primary key: [$0 ASC, $1 DESC, $2 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 7 { columns: [window_start, max(count), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, num, window_start, window_start#1], primary key: [$0 ASC, $2 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [2], read pk prefix len hint: 3 } @@ -516,7 +516,7 @@ Table 1 { columns: [price, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [max(price), $expr1, $expr2], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [max(price), $expr1, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 4 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [$expr1, max(price), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, price, bidder, date_time, _row_id, $expr1], primary key: [$4 ASC, $5 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [1], read pk prefix len hint: 3 } - id: nexmark_q8 @@ -618,9 +618,9 @@ Table 2 { columns: [seller, $expr3, $expr4], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [0, 1, 2], read pk prefix len hint: 3 } Table 3 { columns: [seller, $expr3, $expr4, _degree], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2], read pk prefix len hint: 3 } Table 4 { columns: [id, name, $expr1, $expr2, count], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC], value indices: [4], distribution key: [0, 1, 2, 3], read pk prefix len hint: 4 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 6 { columns: [seller, $expr3, $expr4, count], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3], distribution key: [0, 1, 2], read pk prefix len hint: 3 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 7 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [id, name, starttime, $expr2, seller, $expr3, $expr4], primary key: [$0 ASC, $1 ASC, $2 ASC, $3 ASC, $4 ASC, $5 ASC, $6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [0, 2, 3], read pk prefix len hint: 7 } - id: nexmark_q9 before: @@ -700,8 +700,8 @@ Table 2 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$0 ASC, $7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [0], read pk prefix len hint: 1 } Table 4 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 6 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id, _row_id#1], primary key: [$13 ASC, $14 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0], read pk prefix len hint: 3 } - id: nexmark_q10 before: @@ -730,7 +730,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 0 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, date_time, date, time, _row_id], primary key: [$6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6], read pk prefix len hint: 1 } - id: nexmark_q11 before: @@ -826,7 +826,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 0 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, bidtimetype, date_time, extra, _row_id], primary key: [$6 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6], read pk prefix len hint: 1 } - id: nexmark_q15 before: @@ -882,7 +882,7 @@ Table 0 { columns: [$expr1, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32))], primary key: [$0 ASC], value indices: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0], read pk prefix len hint: 1 } Table 1 { columns: [$expr1, bidder, count_for_agg_call_4, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 } Table 2 { columns: [$expr1, auction, count_for_agg_call_8, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5], distribution key: [0], read pk prefix len hint: 2 } - Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 3 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], distribution key: [0], read pk prefix len hint: 1 } - id: nexmark_q16 before: @@ -940,7 +940,7 @@ Table 0 { columns: [channel, $expr1, max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32))], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1], read pk prefix len hint: 2 } Table 1 { columns: [channel, $expr1, bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 3 } Table 2 { columns: [channel, $expr1, auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12], primary key: [$0 ASC, $1 ASC, $2 ASC], value indices: [3, 4, 5, 6], distribution key: [0, 1], read pk prefix len hint: 3 } - Table 3 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 3 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], distribution key: [0, 1], read pk prefix len hint: 2 } - id: nexmark_q17 before: @@ -990,7 +990,7 @@ source state table: 1 Table 0 { columns: [auction, $expr1, count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), min(price), max(price), sum(price), count(price)], primary key: [$0 ASC, $1 ASC], value indices: [2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0, 1], read pk prefix len hint: 2 } - Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 1 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, day, total_bids, rank1_bids, rank2_bids, rank3_bids, min_price, max_price, avg_price, sum_price], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0, 1], read pk prefix len hint: 2 } - id: nexmark_q18 before: @@ -1038,7 +1038,7 @@ source state table: 1 Table 0 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$1 ASC, $0 ASC, $5 DESC, $7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [1, 0], read pk prefix len hint: 2 } - Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 1 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], primary key: [$7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [7], read pk prefix len hint: 1 } - id: nexmark_q19 before: @@ -1117,8 +1117,8 @@ Table 1 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, _row_id], primary key: [$0 ASC, $9 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 5 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 4 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 5 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, channel, url, date_timeb, item_name, description, initial_bid, reserve, date_timea, expires, seller, category, _row_id, _row_id#1], primary key: [$14 ASC, $15 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], distribution key: [0], read pk prefix len hint: 3 } - id: nexmark_q21 before: @@ -1170,7 +1170,7 @@ StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } source state table: 0 - Table 0 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 0 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction, bidder, price, channel, dir1, dir2, dir3, _row_id], primary key: [$7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [7], read pk prefix len hint: 1 } - id: nexmark_q101 before: @@ -1240,9 +1240,9 @@ Table 1 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [auction, max(price)], primary key: [$0 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 4 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [auction, max(price), count], primary key: [$0 ASC], value indices: [1, 2], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 6 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, current_highest_bid, _row_id, auction], primary key: [$3 ASC, $4 ASC, $0 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [0], read pk prefix len hint: 3 } - id: nexmark_q102 before: @@ -1359,8 +1359,8 @@ Table 4 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 5 { columns: [auction, _row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 6 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 8 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 7 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 8 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 9 { columns: [sum0(count), count(auction), count], primary key: [], value indices: [0, 1, 2], distribution key: [], read pk prefix len hint: 0 } Table 10 { columns: [auction, count], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, bid_count], primary key: [$0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 2 } @@ -1432,9 +1432,9 @@ Table 1 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [auction], primary key: [$0 ASC], value indices: [0], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 4 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [auction, count], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 6 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 2 } - id: nexmark_q104 before: @@ -1504,9 +1504,9 @@ Table 1 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 2 { columns: [auction], primary key: [$0 ASC], value indices: [0], distribution key: [0], read pk prefix len hint: 1 } Table 3 { columns: [auction, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 4 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 5 { columns: [auction, count], primary key: [$0 ASC], value indices: [1], distribution key: [0], read pk prefix len hint: 1 } - Table 6 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 6 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, _row_id], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [0], read pk prefix len hint: 2 } - id: nexmark_q105 before: @@ -1594,8 +1594,8 @@ Table 4 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 5 { columns: [auction, _row_id], primary key: [$0 ASC, $1 ASC], value indices: [0, 1], distribution key: [0], read pk prefix len hint: 1 } Table 6 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 7 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 8 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 7 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 8 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [auction_id, auction_item_name, bid_count], primary key: [$2 DESC, $0 ASC, $1 ASC], value indices: [0, 1, 2], distribution key: [], read pk prefix len hint: 2 } - id: nexmark_q106 before: @@ -1696,6 +1696,6 @@ Table 6 { columns: [id, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } Table 7 { columns: [auction, price, date_time, _row_id], primary key: [$0 ASC, $3 ASC], value indices: [0, 1, 2, 3], distribution key: [0], read pk prefix len hint: 1 } Table 8 { columns: [auction, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0], read pk prefix len hint: 1 } - Table 9 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } - Table 10 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 9 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 10 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [min_final], primary key: [], value indices: [0], distribution key: [], read pk prefix len hint: 0 } diff --git a/src/frontend/planner_test/tests/testdata/watermark.yaml b/src/frontend/planner_test/tests/testdata/watermark.yaml index ea49a48c8891..020d8e3a4c0a 100644 --- a/src/frontend/planner_test/tests/testdata/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/watermark.yaml @@ -26,7 +26,7 @@ StreamSource { source: "t", columns: ["v1", "_row_id"] } source state table: 1 - Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } + Table 1 { columns: [partition_id, offset_info], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 1 } Table 4294967294 { columns: [v1, _row_id], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 } - name: watermark on append only table with source sql: | diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index c2c7fdaf7e28..87f155f80189 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -216,7 +216,7 @@ pub fn default_source_internal_table(id: u32) -> ProstTable { let columns = vec![ make_column(TypeName::Varchar, 0), - make_column(TypeName::Bytea, 1), + make_column(TypeName::Jsonb, 1), ]; ProstTable { id, From f8d371d84c9ba5132bc09a30f55c96213f003eb2 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Thu, 16 Mar 2023 19:53:34 +0800 Subject: [PATCH 3/4] fix test --- src/stream/src/executor/source/state_table_handler.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 87f155f80189..e3f2c376126a 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -245,6 +245,7 @@ pub(crate) mod tests { use risingwave_common::util::epoch::EpochPair; use risingwave_connector::source::kafka::KafkaSplit; use risingwave_storage::memory::MemoryStateStore; + use serde_json::Value; use super::*; @@ -256,8 +257,8 @@ pub(crate) mod tests { .await; let a: Arc = String::from("a").into(); let a: Datum = Some(ScalarImpl::Utf8(a.as_ref().into())); - let b: Arc = String::from("b").into(); - let b: Datum = Some(ScalarImpl::Utf8(b.as_ref().into())); + let b: JsonbVal = serde_json::from_str::("{\"k1\": \"v1\", \"k2\": 11}").unwrap().into(); + let b: Datum = Some(ScalarImpl::Jsonb(b)); let init_epoch_num = 100100; let init_epoch = EpochPair::new_test_epoch(init_epoch_num); From bad0a0829f4e0f06449bb86e221d7fdbf9dbddab Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Thu, 16 Mar 2023 19:58:50 +0800 Subject: [PATCH 4/4] fmt --- src/stream/src/executor/source/state_table_handler.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index e3f2c376126a..1377120a075b 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -257,7 +257,9 @@ pub(crate) mod tests { .await; let a: Arc = String::from("a").into(); let a: Datum = Some(ScalarImpl::Utf8(a.as_ref().into())); - let b: JsonbVal = serde_json::from_str::("{\"k1\": \"v1\", \"k2\": 11}").unwrap().into(); + let b: JsonbVal = serde_json::from_str::("{\"k1\": \"v1\", \"k2\": 11}") + .unwrap() + .into(); let b: Datum = Some(ScalarImpl::Jsonb(b)); let init_epoch_num = 100100;