Skip to content

Commit

Permalink
Merge branch 'main' into fix-sqlparser-trim
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Apr 6, 2023
2 parents 29a9896 + 8038315 commit 266acdc
Show file tree
Hide file tree
Showing 34 changed files with 879 additions and 192 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 6
timeout_in_minutes: 7

- label: "regress test"
command: "ci/scripts/regress-test.sh -p ci-dev"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

49 changes: 47 additions & 2 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,48 @@ def section_overview(panels):
],
),
panels.timeseries_count(
"Errors by Type",
"Alerts",
"""Alerts in the system group by type:
- Too Many Barriers: there are too many uncommitted barriers generated. This means the streaming graph is stuck or under heavy load. Check 'Barrier Latency' panel.
- Recovery Triggered: cluster recovery is triggered. Check 'Errors by Type' / 'Node Count' panels.
- Lagging Version: the checkpointed or pinned version id is lagging behind the current version id. Check 'Hummock Manager' section in dev dashboard.
- Lagging Epoch: the pinned or safe epoch is lagging behind the current max committed epoch. Check 'Hummock Manager' section in dev dashboard.
- Lagging Compaction: there are too many files in L0. This can be caused by compactor failure or lag of compactor resource. Check 'Compaction' section in dev dashboard.
- Lagging Vacuum: there are too many stale files waiting to be cleaned. This can be caused by compactor failure or lag of compactor resource. Check 'Compaction' section in dev dashboard.
""",
[
panels.target(
f"{metric('all_barrier_nums')} >= bool 200",
"Too Many Barriers",
),
panels.target(
f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) > bool 0 + sum({metric('recovery_failure_cnt')}) > bool 0",
"Recovery Triggered",
),
panels.target(
f"(({metric('storage_current_version_id')} - {metric('storage_checkpoint_version_id')}) >= bool 100) + " +
f"(({metric('storage_current_version_id')} - {metric('storage_min_pinned_version_id')}) >= bool 100)",
"Lagging Version",
),
panels.target(
f"(({metric('storage_max_committed_epoch')} - {metric('storage_min_pinned_epoch')}) >= bool 6553600000 unless + {metric('storage_min_pinned_epoch')} == 0) + " +
f"(({metric('storage_max_committed_epoch')} - {metric('storage_safe_epoch')}) >= bool 6553600000 unless + {metric('storage_safe_epoch')} == 0)",
"Lagging Epoch",
),
panels.target(
f"sum(label_replace({metric('storage_level_sst_num')}, 'L0', 'L0', 'level_index', '.*_L0') unless " +
f"{metric('storage_level_sst_num')}) by (L0) >= bool 200",
"Lagging Compaction",
),
panels.target(
f"{metric('storage_stale_object_count')} >= bool 200",
"Lagging Vacuum",
),
],
["last"],
),
panels.timeseries_count(
"Errors",
"Errors in the system group by type",
[
panels.target(
Expand All @@ -66,7 +107,11 @@ def section_overview(panels):
),
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"source error {{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
"parse error {{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
panels.target(
f"{metric('source_status_is_up')} == 0",
"source error: source_id={{source_id}}, source_name={{source_name}} @ {{instance}}",
),
panels.target(
f"sum(rate({metric('object_store_failure_count')}[$__rate_interval])) by (instance, job, type)",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ enum ArrayType {
BYTEA = 15;
JSONB = 16;
SERIAL = 17;
INT256 = 18;
UINT256 = 19;
}

message Array {
Expand Down
10 changes: 2 additions & 8 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::iter::empty;
use std::marker::PhantomData;
use std::sync::Arc;

use fixedbitset::FixedBitSet;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Array, DataChunk, RowRef};
Expand Down Expand Up @@ -232,13 +231,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
JoinHashMap::with_capacity_and_hasher(build_row_count, PrecomputedBuildHasher);
let mut next_build_row_with_same_key =
ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
let null_matched = {
let mut null_matched = FixedBitSet::with_capacity(self.null_matched.len());
for (idx, col_null_matched) in self.null_matched.into_iter().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};

let null_matched = self.null_matched.into();

// Build hash map
for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
Expand Down
11 changes: 2 additions & 9 deletions src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

use std::marker::PhantomData;

use fixedbitset::FixedBitSet;
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::RwError;
use risingwave_common::hash::{HashKey, PrecomputedBuildHasher};
use risingwave_common::hash::{HashKey, NullBitmap, PrecomputedBuildHasher};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand Down Expand Up @@ -67,13 +66,7 @@ impl<K: HashKey> LookupJoinBase<K> {
pub async fn do_execute(mut self: Box<Self>) {
let outer_side_schema = self.outer_side_input.schema().clone();

let null_matched = {
let mut null_matched = FixedBitSet::with_capacity(self.null_safe.len());
for (idx, col_null_matched) in self.null_safe.iter().copied().enumerate() {
null_matched.set(idx, col_null_matched);
}
null_matched
};
let null_matched: NullBitmap = self.null_safe.into();

let mut outer_side_batch_read_stream: BoxedDataChunkListStream =
utils::batch_read(self.outer_side_input.execute(), AT_LEAST_OUTER_SIDE_ROWS);
Expand Down
5 changes: 5 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ serde = { version = "1", features = ["derive"] }
serde_default = "0.1"
serde_json = "1"
serde_with = "2"
smallbitset = "0.6.1"
static_assertions = "1"
strum = "0.24"
strum_macros = "0.24"
Expand Down Expand Up @@ -119,6 +120,10 @@ harness = false
name = "bench_hash_key_encoding"
harness = false

[[bench]]
name = "bench_data_chunk_encoding"
harness = false

[[bin]]
name = "example-config"
path = "src/bin/default_config.rs"
58 changes: 58 additions & 0 deletions src/common/benches/bench_data_chunk_encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{criterion_group, criterion_main, Criterion};
use risingwave_common::test_utils::rand_chunk;
use risingwave_common::types::DataType;

static SEED: u64 = 998244353u64;
static CHUNK_SIZES: &[usize] = &[128, 1024];
static NULL_RATIOS: &[f64] = &[0.0, 0.01, 0.1];

struct DataChunkBenchCase {
pub name: String,
pub data_types: Vec<DataType>,
}

impl DataChunkBenchCase {
pub fn new(name: &str, data_types: Vec<DataType>) -> Self {
Self {
name: name.to_string(),
data_types,
}
}
}

fn bench_data_chunk_encoding(c: &mut Criterion) {
let test_cases = vec![
DataChunkBenchCase::new("Int16", vec![DataType::Int16]),
DataChunkBenchCase::new("String", vec![DataType::Varchar]),
DataChunkBenchCase::new("Int16 and String", vec![DataType::Int16, DataType::Varchar]),
];
for case in test_cases {
for null_ratio in NULL_RATIOS {
for chunk_size in CHUNK_SIZES {
let id = format!(
"data chunk encoding: {}, {} rows, Pr[null]={}",
case.name, chunk_size, null_ratio
);
let chunk = rand_chunk::gen_chunk(&case.data_types, *chunk_size, SEED, *null_ratio);
c.bench_function(&id, |b| b.iter(|| chunk.serialize()));
}
}
}
}

criterion_group!(benches, bench_data_chunk_encoding);
criterion_main!(benches);
51 changes: 8 additions & 43 deletions src/common/benches/bench_hash_key_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@

use criterion::{criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::serial_array::SerialArray;
use risingwave_common::array::{
ArrayBuilderImpl, BoolArray, DataChunk, DateArray, DecimalArray, F32Array, F64Array, I16Array,
I32Array, I64Array, IntervalArray, TimeArray, TimestampArray, Utf8Array,
};
use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
use risingwave_common::hash::{calc_hash_key_kind, HashKey, HashKeyDispatcher};
use risingwave_common::test_utils::rand_array::seed_rand_array_ref;
use risingwave_common::test_utils::rand_chunk;
use risingwave_common::types::DataType;

static SEED: u64 = 998244353u64;
Expand Down Expand Up @@ -50,13 +45,14 @@ impl HashKeyDispatcher for HashKeyBenchCaseBuilder {
for null_ratio in NULL_RATIOS {
for chunk_size in CHUNK_SIZES {
let id = format!(
"{} {:?}, {} rows, Pr[null]={}",
"{} rows, {} {:?}, Pr[null]={}",
chunk_size,
self.describe,
calc_hash_key_kind(self.data_types()),
chunk_size,
null_ratio
);
let input_chunk = gen_chunk(self.data_types(), *chunk_size, SEED, *null_ratio);
let input_chunk =
rand_chunk::gen_chunk(self.data_types(), *chunk_size, SEED, *null_ratio);
ret.push(Box::new(HashKeyBenchCase::<K>::new(
id,
input_chunk,
Expand Down Expand Up @@ -139,37 +135,6 @@ impl<K: HashKey> Case for HashKeyBenchCase<K> {
}
}

fn gen_chunk(data_types: &[DataType], size: usize, seed: u64, null_ratio: f64) -> DataChunk {
let mut columns = vec![];

for d in data_types {
columns.push(Column::new(match d {
DataType::Boolean => seed_rand_array_ref::<BoolArray>(size, seed, null_ratio),
DataType::Int16 => seed_rand_array_ref::<I16Array>(size, seed, null_ratio),
DataType::Int32 => seed_rand_array_ref::<I32Array>(size, seed, null_ratio),
DataType::Int64 => seed_rand_array_ref::<I64Array>(size, seed, null_ratio),
DataType::Float32 => seed_rand_array_ref::<F32Array>(size, seed, null_ratio),
DataType::Float64 => seed_rand_array_ref::<F64Array>(size, seed, null_ratio),
DataType::Decimal => seed_rand_array_ref::<DecimalArray>(size, seed, null_ratio),
DataType::Date => seed_rand_array_ref::<DateArray>(size, seed, null_ratio),
DataType::Varchar => seed_rand_array_ref::<Utf8Array>(size, seed, null_ratio),
DataType::Time => seed_rand_array_ref::<TimeArray>(size, seed, null_ratio),
DataType::Serial => seed_rand_array_ref::<SerialArray>(size, seed, null_ratio),
DataType::Timestamp => seed_rand_array_ref::<TimestampArray>(size, seed, null_ratio),
DataType::Timestamptz => seed_rand_array_ref::<I64Array>(size, seed, null_ratio),
DataType::Interval => seed_rand_array_ref::<IntervalArray>(size, seed, null_ratio),
DataType::Struct(_) | DataType::Bytea | DataType::Jsonb => {
todo!()
}
DataType::List { datatype: _ } => {
todo!()
}
}));
}
risingwave_common::util::schema_check::schema_check(data_types, &columns).unwrap();
DataChunk::new(columns, size)
}

fn case_builders() -> Vec<HashKeyBenchCaseBuilder> {
vec![
HashKeyBenchCaseBuilder {
Expand All @@ -194,11 +159,11 @@ fn case_builders() -> Vec<HashKeyBenchCaseBuilder> {
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Int32, DataType::Int32],
describe: "composite fixed".to_string(),
describe: "composite fixed, case 1".to_string(),
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Int64, DataType::Int32],
describe: "composite fixed".to_string(),
describe: "composite fixed, case 2".to_string(),
},
HashKeyBenchCaseBuilder {
data_types: vec![DataType::Int32, DataType::Varchar],
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl From<&ListArray> for arrow_array::ListArray {
ArrayImpl::Int64(a) => build(array, a, Int64Builder::with_capacity(a.len()), |b, v| {
b.append_option(v)
}),

ArrayImpl::Float32(a) => {
build(array, a, Float32Builder::with_capacity(a.len()), |b, v| {
b.append_option(v.map(|f| f.0))
Expand All @@ -454,6 +455,8 @@ impl From<&ListArray> for arrow_array::ListArray {
StringBuilder::with_capacity(a.len(), a.data().len()),
|b, v| b.append_option(v),
),
ArrayImpl::Int256(_a) => todo!(),
ArrayImpl::Uint256(_a) => todo!(),
ArrayImpl::Bool(a) => {
build(array, a, BooleanBuilder::with_capacity(a.len()), |b, v| {
b.append_option(v)
Expand Down
Loading

0 comments on commit 266acdc

Please sign in to comment.