From 03e03cefaccf0b3d9431e5c9176d7e5ce077d9a5 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Sun, 15 Sep 2024 14:00:11 -0400 Subject: [PATCH] implement FilterFn for ChunkedArray (#794) --- .../src/bitpacking/compute/search_sorted.rs | 2 +- .../fastlanes/src/bitpacking/compute/slice.rs | 69 ++++- .../fastlanes/src/bitpacking/compute/take.rs | 15 +- encodings/fastlanes/src/bitpacking/mod.rs | 13 +- encodings/runend/src/compute.rs | 1 - encodings/runend/src/runend.rs | 13 + vortex-array/src/array/chunked/canonical.rs | 4 - .../src/array/chunked/compute/filter.rs | 239 ++++++++++++++++++ vortex-array/src/array/chunked/compute/mod.rs | 9 +- vortex-array/src/array/sparse/mod.rs | 17 +- 10 files changed, 358 insertions(+), 24 deletions(-) create mode 100644 vortex-array/src/array/chunked/compute/filter.rs diff --git a/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs b/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs index 16b7f34ce5..194599352e 100644 --- a/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs +++ b/encodings/fastlanes/src/bitpacking/compute/search_sorted.rs @@ -66,7 +66,7 @@ impl BitPackedSearch { offset: array.offset(), length: array.len(), bit_width: array.bit_width(), - min_patch_offset: array.patches().map(|p| { + min_patch_offset: array.patches().and_then(|p| { SparseArray::try_from(p) .vortex_expect("Only sparse patches are supported") .min_index() diff --git a/encodings/fastlanes/src/bitpacking/compute/slice.rs b/encodings/fastlanes/src/bitpacking/compute/slice.rs index 90f795b79c..5e7d597008 100644 --- a/encodings/fastlanes/src/bitpacking/compute/slice.rs +++ b/encodings/fastlanes/src/bitpacking/compute/slice.rs @@ -1,8 +1,9 @@ use std::cmp::max; +use vortex::array::SparseArray; use vortex::compute::{slice, SliceFn}; use vortex::{Array, IntoArray}; -use vortex_error::VortexResult; +use vortex_error::{VortexExpect, VortexResult}; use crate::BitPackedArray; @@ -19,9 +20,19 @@ impl SliceFn for BitPackedArray { Self::try_new_from_offset( slice(&self.packed(), encoded_start, encoded_stop)?, self.validity().slice(start, stop)?, - self.patches().map(|p| slice(&p, start, stop)).transpose()?, + self.patches() + .map(|p| slice(&p, start, stop)) + .transpose()? + .filter(|a| { + // If the sliced patch_indices is empty, we should not propagate the patches. + // There may be other logic that depends on Some(patches) indicating non-empty. + !SparseArray::try_from(a) + .vortex_expect("BitPackedArray must encode patches as SparseArray") + .indices() + .is_empty() + }), self.bit_width(), - offset_stop - offset_start, + stop - start, offset, ) .map(|a| a.into_array()) @@ -30,9 +41,10 @@ impl SliceFn for BitPackedArray { #[cfg(test)] mod test { - use vortex::array::PrimitiveArray; - use vortex::compute::slice; + use itertools::Itertools; + use vortex::array::{PrimitiveArray, SparseArray}; use vortex::compute::unary::scalar_at; + use vortex::compute::{slice, take}; use vortex::IntoArray; use crate::BitPackedArray; @@ -140,4 +152,51 @@ mod test { assert_eq!(doubly_sliced.offset(), 639); assert_eq!(doubly_sliced.len(), 784); } + + #[test] + fn slice_empty_patches() { + // We create an array that has 1 element that does not fit in the 6-bit range. + let array = + BitPackedArray::encode(PrimitiveArray::from((0u32..=64).collect_vec()).array(), 6) + .unwrap(); + + assert!(array.patches().is_some()); + + let patch_indices = SparseArray::try_from(array.patches().unwrap()) + .unwrap() + .indices(); + assert_eq!(patch_indices.len(), 1); + + // Slicing drops the empty patches array. + let sliced = slice(&array.into_array(), 0, 64).unwrap(); + let sliced_bp = BitPackedArray::try_from(sliced).unwrap(); + assert!(sliced_bp.patches().is_none()); + } + + #[test] + fn take_after_slice() { + // Check that our take implementation respects the offsets applied after slicing. + + let array = BitPackedArray::encode( + PrimitiveArray::from((63u32..).take(3072).collect_vec()).array(), + 6, + ) + .unwrap(); + + // Slice the array. + // The resulting array will still have 3 1024-element chunks. + let sliced = slice(array.array(), 922, 2061).unwrap(); + + // Take one element from each chunk. + // Chunk 1: physical indices 922-1023, logical indices 0-101 + // Chunk 2: physical indices 1024-2047, logical indices 102-1125 + // Chunk 3: physical indices 2048-2060, logical indices 1126-1138 + + let taken = take( + &sliced, + PrimitiveArray::from(vec![101i64, 1125i64, 1138i64]).array(), + ) + .unwrap(); + assert_eq!(taken.len(), 3); + } } diff --git a/encodings/fastlanes/src/bitpacking/compute/take.rs b/encodings/fastlanes/src/bitpacking/compute/take.rs index 8bf168a0b4..11bf1b5210 100644 --- a/encodings/fastlanes/src/bitpacking/compute/take.rs +++ b/encodings/fastlanes/src/bitpacking/compute/take.rs @@ -83,11 +83,16 @@ fn take_primitive( if !prefer_bulk_patch { if let Some(ref patches) = patches { - let patches_slice = slice( - patches.array(), - chunk * 1024, - min((chunk + 1) * 1024, patches.len()), - )?; + // NOTE: we need to subtract the array offset before slicing into the patches. + // This is because BitPackedArray is rounded to block boundaries, but patches + // is sliced exactly. + let patches_start = if chunk == 0 { + 0 + } else { + (chunk * 1024) - array.offset() + }; + let patches_end = min((chunk + 1) * 1024 - array.offset(), patches.len()); + let patches_slice = slice(patches.array(), patches_start, patches_end)?; let patches_slice = SparseArray::try_from(patches_slice)?; let offsets = PrimitiveArray::from(offsets); do_patch_for_take_primitive(&patches_slice, &offsets, &mut output)?; diff --git a/encodings/fastlanes/src/bitpacking/mod.rs b/encodings/fastlanes/src/bitpacking/mod.rs index f83a6e4e10..d95ab128ef 100644 --- a/encodings/fastlanes/src/bitpacking/mod.rs +++ b/encodings/fastlanes/src/bitpacking/mod.rs @@ -1,6 +1,6 @@ use ::serde::{Deserialize, Serialize}; pub use compress::*; -use vortex::array::{Primitive, PrimitiveArray}; +use vortex::array::{Primitive, PrimitiveArray, SparseArray}; use vortex::stats::{ArrayStatisticsCompute, StatsSet}; use vortex::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; use vortex::variants::{ArrayVariants, PrimitiveArrayTrait}; @@ -80,6 +80,10 @@ impl BitPackedArray { parray.len() ) } + + if SparseArray::try_from(parray)?.indices().is_empty() { + vortex_bail!("cannot construct BitPackedArray using patches without indices"); + } } let metadata = BitPackedMetadata { @@ -123,9 +127,14 @@ impl BitPackedArray { self.metadata().bit_width } + /// Access the patches array. + /// + /// If present, patches MUST be a `SparseArray` with equal-length to this array, and whose + /// indices indicate the locations of patches. The indices must have non-zero length. #[inline] pub fn patches(&self) -> Option { - (self.metadata().has_patches) + self.metadata() + .has_patches .then(|| { self.array().child( 1, diff --git a/encodings/runend/src/compute.rs b/encodings/runend/src/compute.rs index bfaaf4a2e8..0f3bca0926 100644 --- a/encodings/runend/src/compute.rs +++ b/encodings/runend/src/compute.rs @@ -50,7 +50,6 @@ impl TakeFn for RunEndArray { } self.find_physical_index(idx).map(|loc| loc as u64) }) - .collect::>>()? }); let physical_indices_array = PrimitiveArray::from(physical_indices).into_array(); diff --git a/encodings/runend/src/runend.rs b/encodings/runend/src/runend.rs index c03fc86ad5..06c86218ba 100644 --- a/encodings/runend/src/runend.rs +++ b/encodings/runend/src/runend.rs @@ -78,11 +78,13 @@ impl RunEndArray { Self::try_from_parts(dtype, length, metadata, children.into(), StatsSet::new()) } + /// Convert the given logical index to an index into the `values` array pub fn find_physical_index(&self, index: usize) -> VortexResult { search_sorted(&self.ends(), index + self.offset(), SearchSortedSide::Right) .map(|s| s.to_ends_index(self.ends().len())) } + /// Run the array through run-end encoding. pub fn encode(array: Array) -> VortexResult { if array.encoding().id() == Primitive::ID { let primitive = PrimitiveArray::try_from(array)?; @@ -99,11 +101,18 @@ impl RunEndArray { .to_validity(self.array().child(2, &Validity::DTYPE, self.len())) } + /// The offset that the `ends` is relative to. + /// + /// This is generally zero for a "new" array, and non-zero after a slicing operation. #[inline] pub fn offset(&self) -> usize { self.metadata().offset } + /// The encoded "ends" of value runs. + /// + /// The `i`-th element indicates that there is a run of the same value, beginning + /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive). #[inline] pub fn ends(&self) -> Array { self.array() @@ -111,6 +120,10 @@ impl RunEndArray { .vortex_expect("RunEndArray is missing its run ends") } + /// The scalar values. + /// + /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins + /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive). #[inline] pub fn values(&self) -> Array { self.array() diff --git a/vortex-array/src/array/chunked/canonical.rs b/vortex-array/src/array/chunked/canonical.rs index a4bf4d38d3..0000f6a5d8 100644 --- a/vortex-array/src/array/chunked/canonical.rs +++ b/vortex-array/src/array/chunked/canonical.rs @@ -34,10 +34,6 @@ pub(crate) fn try_canonicalize_chunks( validity: Validity, dtype: &DType, ) -> VortexResult { - if chunks.is_empty() { - vortex_bail!(InvalidArgument: "chunks must be non-empty") - } - let mismatched = chunks .iter() .filter(|chunk| !chunk.dtype().eq(dtype)) diff --git a/vortex-array/src/array/chunked/compute/filter.rs b/vortex-array/src/array/chunked/compute/filter.rs new file mode 100644 index 0000000000..9f057a1b70 --- /dev/null +++ b/vortex-array/src/array/chunked/compute/filter.rs @@ -0,0 +1,239 @@ +use arrow_buffer::BooleanBufferBuilder; +use vortex_error::{VortexExpect, VortexResult}; + +use crate::array::{BoolArray, ChunkedArray, PrimitiveArray}; +use crate::compute::{filter, take, FilterFn, SearchSorted, SearchSortedSide}; +use crate::{Array, ArrayDType, IntoArray, IntoCanonical}; + +// This is modeled after the constant with the equivalent name in arrow-rs. +const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8; + +impl FilterFn for ChunkedArray { + fn filter(&self, predicate: &Array) -> VortexResult { + predicate.with_dyn(move |a| { + // SAFETY: the DType should be checked in the top-level `filter` function. + let bool_array = a.as_bool_array_unchecked(); + let selected = bool_array.true_count(); + + if selected == self.len() { + // Fast path 1: no filtering + Ok(self.clone().into_array()) + } else if selected == 0 { + // Fast path 2: empty array after filter. + Ok(ChunkedArray::try_new(vec![], self.dtype().clone())?.into_array()) + } else { + // General path: perform filtering. + // + // Based on filter selectivity, we take the values between a range of slices, or + // we take individual indices. + let selectivity = selected as f64 / self.len() as f64; + let chunks = if selectivity > FILTER_SLICES_SELECTIVITY_THRESHOLD { + filter_slices(self, bool_array.maybe_null_slices_iter())? + } else { + filter_indices(self, bool_array.maybe_null_indices_iter())? + }; + + Ok(ChunkedArray::try_new(chunks, self.dtype().clone())?.into_array()) + } + }) + } +} + +/// The filter to apply to each chunk. +/// +/// When we rewrite a set of slices in a filter predicate into chunk addresses, we want to account +/// for the fact that some chunks will be wholly skipped. +#[derive(Clone)] +enum ChunkFilter { + All, + None, + Slices(Vec<(usize, usize)>), +} + +/// Given a sequence of slices that indicate ranges of set values, returns a boolean array +/// representing the same thing. +fn slices_to_predicate(slices: &[(usize, usize)], len: usize) -> Array { + let mut buffer = BooleanBufferBuilder::new(len); + + let mut pos = 0; + for (slice_start, slice_end) in slices.iter().copied() { + // write however many trailing `false` between the end of the previous slice and the + // start of this one. + let n_leading_false = slice_start - pos; + buffer.append_n(n_leading_false, false); + buffer.append_n(slice_end - slice_start, true); + pos = slice_end; + } + + // Pad the end of the buffer with false, if necessary. + let n_trailing_false = len - pos; + buffer.append_n(n_trailing_false, false); + + BoolArray::from(buffer.finish()).into_array() +} + +/// Filter the chunks using slice ranges. +fn filter_slices<'a>( + array: &'a ChunkedArray, + set_slices: Box + 'a>, +) -> VortexResult> { + let mut result = Vec::with_capacity(array.nchunks()); + + // Pre-materialize the chunk ends for performance. + // The chunk ends is nchunks+1, which is expected to be in the hundreds or at most thousands. + let chunk_ends = array.chunk_offsets().into_canonical()?.into_primitive()?; + let chunk_ends = chunk_ends.maybe_null_slice::(); + + let mut chunk_filters = vec![ChunkFilter::None; array.nchunks()]; + + for (slice_start, slice_end) in set_slices { + let (start_chunk, start_idx) = find_chunk_idx(slice_start, chunk_ends); + // NOTE: we adjust slice end back by one, in case it ends on a chunk boundary, we do not + // want to index into the unused chunk. + let (end_chunk, end_idx) = find_chunk_idx(slice_end - 1, chunk_ends); + // Adjust back to an exclusive range + let end_idx = end_idx + 1; + + if start_chunk == end_chunk { + // start == end means that the slice lies within a single chunk. + match &mut chunk_filters[start_chunk] { + f @ (ChunkFilter::All | ChunkFilter::None) => { + *f = ChunkFilter::Slices(vec![(start_idx, end_idx)]); + } + ChunkFilter::Slices(slices) => { + slices.push((start_idx, end_idx)); + } + } + } else { + // start != end means that the range is split over at least two chunks: + // start chunk: append a slice from (start_idx, start_chunk_end). + // end chunk: append a slice from (0, end_idx). + // chunks between start and end: append ChunkFilter::All. + let start_chunk_end = chunk_ends[start_chunk + 1]; + let start_slice = (start_idx, start_chunk_end as _); + match &mut chunk_filters[start_chunk] { + f @ (ChunkFilter::All | ChunkFilter::None) => { + *f = ChunkFilter::Slices(vec![start_slice]) + } + ChunkFilter::Slices(slices) => slices.push(start_slice), + } + + let end_slice = (0, end_idx); + match &mut chunk_filters[end_chunk] { + f @ (ChunkFilter::All | ChunkFilter::None) => { + *f = ChunkFilter::Slices(vec![end_slice]); + } + ChunkFilter::Slices(slices) => slices.push(end_slice), + } + + for chunk in &mut chunk_filters[start_chunk + 1..end_chunk] { + *chunk = ChunkFilter::All; + } + } + } + + // Now, apply the chunk filter to every slice. + for (chunk, chunk_filter) in array.chunks().zip(chunk_filters.iter()) { + match chunk_filter { + // All => preserve the entire chunk unfiltered. + ChunkFilter::All => result.push(chunk), + // None => whole chunk is filtered out, skip + ChunkFilter::None => {} + // Slices => turn the slices into a boolean buffer. + ChunkFilter::Slices(slices) => { + result.push(filter(&chunk, &slices_to_predicate(slices, chunk.len()))?); + } + } + } + + Ok(result) +} + +/// Filter the chunks using indices. +fn filter_indices<'a>( + array: &'a ChunkedArray, + set_indices: Box + 'a>, +) -> VortexResult> { + let mut result = Vec::new(); + let mut current_chunk_id = 0; + let mut chunk_indices = Vec::new(); + + // Avoid find_chunk_idx and use our own to avoid the overhead. + // The array should only be some thousands of values in the general case. + let chunk_ends = array.chunk_offsets().into_canonical()?.into_primitive()?; + let chunk_ends = chunk_ends.maybe_null_slice::(); + + for set_index in set_indices { + let (chunk_id, index) = find_chunk_idx(set_index, chunk_ends); + if chunk_id != current_chunk_id { + // Push the chunk we've accumulated. + if !chunk_indices.is_empty() { + let chunk = array + .chunk(current_chunk_id) + .vortex_expect("find_chunk_idx must return valid chunk ID"); + let filtered_chunk = take( + &chunk, + &PrimitiveArray::from(chunk_indices.clone()).into_array(), + )?; + result.push(filtered_chunk); + } + + // Advance the chunk forward, reset the chunk indices buffer. + current_chunk_id = chunk_id; + chunk_indices.clear(); + } + + chunk_indices.push(index as u64); + } + + if !chunk_indices.is_empty() { + let chunk = array + .chunk(current_chunk_id) + .vortex_expect("find_chunk_idx must return valid chunk ID"); + let filtered_chunk = take( + &chunk, + &PrimitiveArray::from(chunk_indices.clone()).into_array(), + )?; + result.push(filtered_chunk); + } + + Ok(result) +} + +// Mirrors the find_chunk_idx method on ChunkedArray, but avoids all of the overhead +// from scalars, dtypes, and metadata cloning. +fn find_chunk_idx(idx: usize, chunk_ends: &[u64]) -> (usize, usize) { + let chunk_id = chunk_ends + .search_sorted(&(idx as u64), SearchSortedSide::Right) + .to_ends_index(chunk_ends.len()) + .saturating_sub(1); + let chunk_offset = idx - (chunk_ends[chunk_id] as usize); + + (chunk_id, chunk_offset) +} + +#[cfg(test)] +mod test { + use itertools::Itertools; + + use crate::array::chunked::compute::filter::slices_to_predicate; + use crate::IntoArrayVariant; + + #[test] + fn test_slices_to_predicate() { + let slices = [(2, 4), (6, 8), (9, 10)]; + let predicate = slices_to_predicate(&slices, 11); + + let bools = predicate + .into_bool() + .unwrap() + .boolean_buffer() + .iter() + .collect_vec(); + + assert_eq!( + bools, + vec![false, false, true, true, false, false, true, true, false, true, false], + ) + } +} diff --git a/vortex-array/src/array/chunked/compute/mod.rs b/vortex-array/src/array/chunked/compute/mod.rs index a2faef7f46..c2b21d5bad 100644 --- a/vortex-array/src/array/chunked/compute/mod.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -3,9 +3,12 @@ use vortex_error::VortexResult; use crate::array::chunked::ChunkedArray; use crate::compute::unary::{try_cast, CastFn, ScalarAtFn, SubtractScalarFn}; -use crate::compute::{compare, slice, ArrayCompute, CompareFn, Operator, SliceFn, TakeFn}; +use crate::compute::{ + compare, slice, ArrayCompute, CompareFn, FilterFn, Operator, SliceFn, TakeFn, +}; use crate::{Array, IntoArray}; +mod filter; mod scalar_at; mod slice; mod take; @@ -34,6 +37,10 @@ impl ArrayCompute for ChunkedArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } + + fn filter(&self) -> Option<&dyn FilterFn> { + Some(self) + } } impl CastFn for ChunkedArray { diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 5b46d82d50..ed6e2d6dfa 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -137,11 +137,18 @@ impl SparseArray { }) } - pub fn min_index(&self) -> usize { - let min_index: usize = scalar_at(&self.indices(), 0) - .and_then(|s| s.as_ref().try_into()) - .vortex_expect("Failed to get min_index from SparseArray"); - min_index - self.indices_offset() + /// Return the minimum index if indices are present. + /// + /// If this sparse array has no indices (i.e. all elements are equal to fill_value) + /// then it returns None. + pub fn min_index(&self) -> Option { + (!self.indices().is_empty()).then(|| { + let min_index: usize = scalar_at(&self.indices(), 0) + .and_then(|s| s.as_ref().try_into()) + .vortex_expect("SparseArray indices is non-empty"); + + min_index - self.indices_offset() + }) } }