Skip to content

Commit

Permalink
perf(encoding): use estimate size in data chunk encoding (#8924)
Browse files Browse the repository at this point in the history
Co-authored-by: stonepage <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 11, 2023
1 parent 5b5b2e6 commit bcc00c0
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 10 deletions.
9 changes: 9 additions & 0 deletions src/common/benches/bench_data_chunk_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ fn bench_data_chunk_encoding(c: &mut Criterion) {
DataChunkBenchCase::new("Int16", vec![DataType::Int16]),
DataChunkBenchCase::new("String", vec![DataType::Varchar]),
DataChunkBenchCase::new("Int16 and String", vec![DataType::Int16, DataType::Varchar]),
DataChunkBenchCase::new(
"Int16, Int32, Int64 and String",
vec![
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Varchar,
],
),
];
for case in test_cases {
for null_ratio in NULL_RATIOS {
Expand Down
66 changes: 60 additions & 6 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::hash::BuildHasher;
use std::sync::Arc;
use std::{fmt, usize};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use itertools::Itertools;
use risingwave_pb::data::PbDataChunk;

Expand All @@ -34,7 +34,8 @@ use crate::types::{DataType, ToOwnedDatum};
use crate::util::hash_util::finalize_hashers;
use crate::util::iter_util::{ZipEqDebug, ZipEqFast};
use crate::util::value_encoding::{
estimate_serialize_datum_size, serialize_datum_into, ValueRowSerializer,
estimate_serialize_datum_size, serialize_datum_into, try_get_exact_serialize_datum_size,
ValueRowSerializer,
};

/// [`DataChunk`] is a collection of Columns,
Expand Down Expand Up @@ -439,7 +440,36 @@ impl DataChunk {
let buffers = match &self.vis2 {
Vis::Bitmap(vis) => {
let rows_num = vis.len();
let mut buffers = vec![BytesMut::new(); rows_num];
let mut buffers: Vec<Vec<u8>> = vec![];
let mut col_variable: Vec<&Column> = vec![];
let mut row_len_fixed: usize = 0;
for c in &self.columns {
if let Some(field_len) = try_get_exact_serialize_datum_size(&c.array()) {
row_len_fixed += field_len;
} else {
col_variable.push(c);
}
}
for i in 0..rows_num {
// SAFETY(value_at_unchecked): the idx is always in bound.
unsafe {
if vis.is_set_unchecked(i) {
buffers.push(Vec::with_capacity(
row_len_fixed
+ col_variable
.iter()
.map(|col| {
estimate_serialize_datum_size(
col.array_ref().value_at_unchecked(i),
)
})
.sum::<usize>(),
));
} else {
buffers.push(vec![]);
}
}
}
for c in &self.columns {
let c = c.array_ref();
assert_eq!(c.len(), rows_num);
Expand All @@ -455,7 +485,31 @@ impl DataChunk {
buffers
}
Vis::Compact(rows_num) => {
let mut buffers = vec![BytesMut::new(); *rows_num];
let mut buffers: Vec<Vec<u8>> = vec![];
let mut col_variable: Vec<&Column> = vec![];
let mut row_len_fixed: usize = 0;
for c in &self.columns {
if let Some(field_len) = try_get_exact_serialize_datum_size(&c.array()) {
row_len_fixed += field_len;
} else {
col_variable.push(c);
}
}
for i in 0..*rows_num {
unsafe {
buffers.push(Vec::with_capacity(
row_len_fixed
+ col_variable
.iter()
.map(|col| {
estimate_serialize_datum_size(
col.array_ref().value_at_unchecked(i),
)
})
.sum::<usize>(),
));
}
}
for c in &self.columns {
let c = c.array_ref();
assert_eq!(c.len(), *rows_num);
Expand All @@ -470,7 +524,7 @@ impl DataChunk {
}
};

buffers.into_iter().map(BytesMut::freeze).collect_vec()
buffers.into_iter().map(|item| item.into()).collect_vec()
}

/// Serialize each row into bytes with given serializer.
Expand Down
64 changes: 60 additions & 4 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use either::{for_both, Either};
use enum_as_inner::EnumAsInner;
use itertools::Itertools;

use crate::array::{serial_array, JsonbVal, ListRef, ListValue, StructRef, StructValue};
use crate::array::{serial_array, ArrayImpl, JsonbVal, ListRef, ListValue, StructRef, StructValue};
use crate::catalog::ColumnId;
use crate::row::{Row, RowDeserializer as BasicDeserializer};
use crate::types::struct_type::StructType;
Expand Down Expand Up @@ -166,6 +166,26 @@ impl ValueRowSerde for BasicSerde {
}
}

pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
match arr {
ArrayImpl::Int16(_) => Some(2),
ArrayImpl::Int32(_) => Some(4),
ArrayImpl::Int64(_) => Some(8),
ArrayImpl::Serial(_) => Some(8),
ArrayImpl::Float32(_) => Some(4),
ArrayImpl::Float64(_) => Some(8),
ArrayImpl::Bool(_) => Some(1),
ArrayImpl::Jsonb(_) => Some(8),
ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
_ => None,
}
.map(|x| x + 1)
}

/// Serialize a datum into bytes and return (Not order guarantee, used in value encoding).
pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
let mut buf: Vec<u8> = vec![];
Expand Down Expand Up @@ -444,15 +464,25 @@ fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
#[cfg(test)]
mod tests {
use crate::array::serial_array::Serial;
use crate::array::{ListValue, StructValue};
use crate::types::{Date, Datum, Decimal, Interval, ScalarImpl, Time, Timestamp};
use crate::util::value_encoding::{estimate_serialize_datum_size, serialize_datum};
use crate::array::{ArrayImpl, ListValue, StructValue};
use crate::test_utils::rand_chunk;
use crate::types::{DataType, Date, Datum, Decimal, Interval, ScalarImpl, Time, Timestamp};
use crate::util::value_encoding::{
estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
};

fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
let d = Datum::from(s);
assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
}

fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
let d = s.to_datum();
if let Some(ret) = try_get_exact_serialize_datum_size(s) {
assert_eq!(ret, serialize_datum(&d).len());
}
}

#[test]
fn test_estimate_size() {
let d: Datum = None;
Expand Down Expand Up @@ -493,4 +523,30 @@ mod tests {
ScalarImpl::Int64(2333).into(),
])));
}

#[test]
fn test_try_estimate_size() {
let chunk = rand_chunk::gen_chunk(
&[
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Serial,
DataType::Float32,
DataType::Float64,
DataType::Boolean,
DataType::Decimal,
DataType::Interval,
DataType::Time,
DataType::Timestamp,
DataType::Date,
],
1,
0,
0.0,
);
for column in chunk.columns() {
test_try_get_exact_serialize_datum_size(&column.array());
}
}
}

0 comments on commit bcc00c0

Please sign in to comment.