Skip to content

Commit

Permalink
Add partial encoding support (sync only) (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin authored Nov 1, 2024
1 parent 952f543 commit db79077
Show file tree
Hide file tree
Showing 31 changed files with 1,396 additions and 67 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add a `makefile` and simplify `BUILD.md`
- Add chunk-by-chunk update example in `Array` docs
- Add `array::copy_fill_value_into()`
- Add experimental partial encoding support (sync only):
- Enables writing shards incrementally
- With `{CodecOptions,Config}::experimental_partial_encoding` enabled, `Array::store_{array,chunk}_subset` will use partial encoding
- This is an experimental feature for now until it has more comprehensively tested and support is added in the async API
- Adds `ArrayPartialEncoderTraits`, `BytesPartialEncoderTraits`, `StoragePartialEncoder`, `ArrayPartialEncoderDefault`, `BytesPartialEncoderDefault`
- **Breaking**: Add `{ArrayToArray,ArrayToBytes,BytesToBytes}CodecTraits::partial_encoder`

### Changed
- Bump `unsafe_cell_slice` to 0.2.0
Expand Down
1 change: 1 addition & 0 deletions zarrs/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub fn chunk_shape_to_array_shape(chunk_shape: &[std::num::NonZeroU64]) -> Array
/// - [`ReadableWritableStorageTraits`](crate::storage::ReadableWritableStorageTraits): store operations requiring reading *and* writing
/// - [`store_chunk_subset`](Array::store_chunk_subset)
/// - [`store_array_subset`](Array::store_array_subset)
/// - [`partial_encoder`](Array::partial_encoder)
///
/// Many `retrieve` and `store` methods have multiple variants:
/// - Standard variants store or retrieve data represented as [`ArrayBytes`] (representing fixed or variable length bytes).
Expand Down
2 changes: 2 additions & 0 deletions zarrs/src/array/array_async_readable_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits + 'static> Array<TSto
// let mutex = self.storage.mutex(&key).await?;
// let _lock = mutex.lock();

// TODO: Add async partial encoding

// Decode the entire chunk
let chunk_bytes_old = self
.async_retrieve_chunk_opt(chunk_indices, options)
Expand Down
90 changes: 72 additions & 18 deletions zarrs/src/array/array_sync_readable_writable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::sync::Arc;

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{array::ArrayBytes, array_subset::ArraySubset, storage::ReadableWritableStorageTraits};
use crate::{
array::ArrayBytes,
array_subset::ArraySubset,
storage::{ReadableWritableStorageTraits, StorageHandle},
};

use super::{
array_bytes::update_array_bytes, codec::options::CodecOptions,
concurrency::concurrency_chunks_and_codec, Array, ArrayError, Element,
codec::{
options::CodecOptions, ArrayPartialEncoderTraits, ArrayToBytesCodecTraits,
StoragePartialDecoder, StoragePartialEncoder,
},
concurrency::concurrency_chunks_and_codec,
update_array_bytes, Array, ArrayError, Element,
};

impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage> {
Expand Down Expand Up @@ -180,23 +190,29 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
// let mutex = self.storage.mutex(&key)?;
// let _lock = mutex.lock();

// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;
if options.experimental_partial_encoding() {
let partial_encoder = self.partial_encoder(chunk_indices, options)?;
Ok(partial_encoder
.partial_encode(&[(chunk_subset, chunk_subset_bytes)], options)?)
} else {
// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;

// Update the chunk
let chunk_bytes_new = unsafe {
update_array_bytes(
chunk_bytes_old,
&chunk_shape,
chunk_subset,
&chunk_subset_bytes,
self.data_type().size(),
)
};
// Update the chunk
let chunk_bytes_new = unsafe {
update_array_bytes(
chunk_bytes_old,
&chunk_shape,
chunk_subset,
&chunk_subset_bytes,
self.data_type().size(),
)
};

// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
}
}
}

Expand Down Expand Up @@ -352,4 +368,42 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
let subset_array = super::ndarray_into_vec(subset_array);
self.store_array_subset_elements_opt(&subset, &subset_array, options)
}

/// Initialises a partial encoder for the chunk at `chunk_indices`.
///
/// # Errors
/// Returns an [`ArrayError`] if initialisation of the partial encoder fails.
pub fn partial_encoder(
&self,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));

// Input
let storage_transformer_read = self
.storage_transformers()
.create_readable_transformer(storage_handle.clone())?;
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer_read,
self.chunk_key(chunk_indices),
));
let chunk_representation = self.chunk_array_representation(chunk_indices)?;

// Output
let storage_transformer_write = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
let output_handle = Arc::new(StoragePartialEncoder::new(
storage_transformer_write,
self.chunk_key(chunk_indices),
));

Ok(self.codecs.clone().partial_encoder(
input_handle,
output_handle,
&chunk_representation,
options,
)?)
}
}
123 changes: 122 additions & 1 deletion zarrs/src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,19 @@ pub use byte_interval_partial_decoder::ByteIntervalPartialDecoder;
pub use byte_interval_partial_decoder::AsyncByteIntervalPartialDecoder;
use unsafe_cell_slice::UnsafeCellSlice;

mod array_partial_encoder_default;
pub use array_partial_encoder_default::ArrayPartialEncoderDefault;

mod array_to_array_partial_encoder_default;
pub use array_to_array_partial_encoder_default::ArrayToArrayPartialEncoderDefault;

mod bytes_partial_encoder_default;
pub use bytes_partial_encoder_default::BytesPartialEncoderDefault;

use crate::storage::{StoreKeyOffsetValue, WritableStorage};
use crate::{
array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError},
byte_range::{extract_byte_ranges_read_seek, ByteRange, InvalidByteRangeError},
byte_range::{extract_byte_ranges_read_seek, ByteOffset, ByteRange, InvalidByteRangeError},
metadata::v3::MetadataV3,
plugin::{Plugin, PluginCreateError},
storage::{ReadableStorage, StorageError, StoreKey},
Expand Down Expand Up @@ -380,6 +390,44 @@ pub trait ArrayPartialDecoderTraits: Send + Sync {
}
}

/// Partial array encoder traits.
pub trait ArrayPartialEncoderTraits: Send + Sync {
/// Erase the chunk.
///
/// # Errors
/// Returns an error if there is an underlying store error.
fn erase(&self) -> Result<(), CodecError>;

/// Partially encode a chunk.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
subsets_and_bytes: &[(&ArraySubset, ArrayBytes<'_>)],
options: &CodecOptions,
) -> Result<(), CodecError>;
}

/// Partial bytes encoder traits.
pub trait BytesPartialEncoderTraits: Send + Sync {
/// Erase the chunk.
///
/// # Errors
/// Returns an error if there is an underlying store error.
fn erase(&self) -> Result<(), CodecError>;

/// Partially encode a chunk.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
offsets_and_bytes: &[(ByteOffset, crate::array::RawBytes<'_>)],
options: &CodecOptions,
) -> Result<(), CodecError>;
}

#[cfg(feature = "async")]
/// Asynchronous partial array decoder traits.
#[async_trait::async_trait]
Expand Down Expand Up @@ -495,6 +543,37 @@ impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder {
}
}

/// A [`WritableStorage`] store value partial encoder.
pub struct StoragePartialEncoder {
storage: WritableStorage,
key: StoreKey,
}

impl StoragePartialEncoder {
/// Create a new storage partial encoder.
pub fn new(storage: WritableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}

impl BytesPartialEncoderTraits for StoragePartialEncoder {
fn erase(&self) -> Result<(), CodecError> {
Ok(self.storage.erase(&self.key)?)
}

fn partial_encode(
&self,
offsets_and_bytes: &[(ByteOffset, crate::array::RawBytes<'_>)],
_options: &CodecOptions,
) -> Result<(), CodecError> {
let key_offset_values = offsets_and_bytes
.iter()
.map(|(offset, bytes)| StoreKeyOffsetValue::new(self.key.clone(), *offset, bytes))
.collect::<Vec<_>>();
Ok(self.storage.set_partial_values(&key_offset_values)?)
}
}

/// Traits for array to array codecs.
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
Expand Down Expand Up @@ -554,6 +633,18 @@ pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn ArrayPartialDecoderTraits>,
output_handle: Arc<dyn ArrayPartialEncoderTraits>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -565,6 +656,8 @@ pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError>;

// TODO: async_partial_encoder
}

/// Traits for array to bytes codecs.
Expand Down Expand Up @@ -654,6 +747,18 @@ pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &ChunkRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -665,6 +770,8 @@ pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError>;

// TODO: Async partial encoder
}

/// Traits for bytes to bytes codecs.
Expand Down Expand Up @@ -720,6 +827,18 @@ pub trait BytesToBytesCodecTraits: CodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialises an asynchronous partial decoder.
///
Expand All @@ -731,6 +850,8 @@ pub trait BytesToBytesCodecTraits: CodecTraits + core::fmt::Debug {
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncBytesPartialDecoderTraits>, CodecError>;

// TODO: Async partial encoder
}

impl BytesPartialDecoderTraits for std::io::Cursor<&[u8]> {
Expand Down
Loading

0 comments on commit db79077

Please sign in to comment.