Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial encoding support (sync only) #45

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add a `makefile` and simplify `BUILD.md`
- Add chunk-by-chunk update example in `Array` docs
- Add `array::copy_fill_value_into()`
- Add experimental partial encoding support (sync only):
- Enables writing shards incrementally
- With `{CodecOptions,Config}::experimental_partial_encoding` enabled, `Array::store_{array,chunk}_subset` will use partial encoding
- This is an experimental feature for now until it has more comprehensively tested and support is added in the async API
- Adds `ArrayPartialEncoderTraits`, `BytesPartialEncoderTraits`, `StoragePartialEncoder`, `ArrayPartialEncoderDefault`, `BytesPartialEncoderDefault`
- **Breaking**: Add `{ArrayToArray,ArrayToBytes,BytesToBytes}CodecTraits::partial_encoder`

### Changed
- Bump `unsafe_cell_slice` to 0.2.0
Expand Down
1 change: 1 addition & 0 deletions zarrs/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub fn chunk_shape_to_array_shape(chunk_shape: &[std::num::NonZeroU64]) -> Array
/// - [`ReadableWritableStorageTraits`](crate::storage::ReadableWritableStorageTraits): store operations requiring reading *and* writing
/// - [`store_chunk_subset`](Array::store_chunk_subset)
/// - [`store_array_subset`](Array::store_array_subset)
/// - [`partial_encoder`](Array::partial_encoder)
///
/// Many `retrieve` and `store` methods have multiple variants:
/// - Standard variants store or retrieve data represented as [`ArrayBytes`] (representing fixed or variable length bytes).
Expand Down
2 changes: 2 additions & 0 deletions zarrs/src/array/array_async_readable_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits + 'static> Array<TSto
// let mutex = self.storage.mutex(&key).await?;
// let _lock = mutex.lock();

// TODO: Add async partial encoding

// Decode the entire chunk
let chunk_bytes_old = self
.async_retrieve_chunk_opt(chunk_indices, options)
Expand Down
90 changes: 72 additions & 18 deletions zarrs/src/array/array_sync_readable_writable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::sync::Arc;

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{array::ArrayBytes, array_subset::ArraySubset, storage::ReadableWritableStorageTraits};
use crate::{
array::ArrayBytes,
array_subset::ArraySubset,
storage::{ReadableWritableStorageTraits, StorageHandle},
};

use super::{
array_bytes::update_array_bytes, codec::options::CodecOptions,
concurrency::concurrency_chunks_and_codec, Array, ArrayError, Element,
codec::{
options::CodecOptions, ArrayPartialEncoderTraits, ArrayToBytesCodecTraits,
StoragePartialDecoder, StoragePartialEncoder,
},
concurrency::concurrency_chunks_and_codec,
update_array_bytes, Array, ArrayError, Element,
};

impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage> {
Expand Down Expand Up @@ -180,23 +190,29 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
// let mutex = self.storage.mutex(&key)?;
// let _lock = mutex.lock();

// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;
if options.experimental_partial_encoding() {
let partial_encoder = self.partial_encoder(chunk_indices, options)?;
Ok(partial_encoder
.partial_encode(&[(chunk_subset, chunk_subset_bytes)], options)?)
} else {
// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;

// Update the chunk
let chunk_bytes_new = unsafe {
update_array_bytes(
chunk_bytes_old,
&chunk_shape,
chunk_subset,
&chunk_subset_bytes,
self.data_type().size(),
)
};
// Update the chunk
let chunk_bytes_new = unsafe {
update_array_bytes(
chunk_bytes_old,
&chunk_shape,
chunk_subset,
&chunk_subset_bytes,
self.data_type().size(),
)
};

// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
}
}
}

Expand Down Expand Up @@ -352,4 +368,42 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
let subset_array = super::ndarray_into_vec(subset_array);
self.store_array_subset_elements_opt(&subset, &subset_array, options)
}

/// Initialises a partial encoder for the chunk at `chunk_indices`.
///
/// # Errors
/// Returns an [`ArrayError`] if initialisation of the partial encoder fails.
pub fn partial_encoder(
&self,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));

// Input
let storage_transformer_read = self
.storage_transformers()
.create_readable_transformer(storage_handle.clone())?;
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer_read,
self.chunk_key(chunk_indices),
));
let chunk_representation = self.chunk_array_representation(chunk_indices)?;

// Output
let storage_transformer_write = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
let output_handle = Arc::new(StoragePartialEncoder::new(
storage_transformer_write,
self.chunk_key(chunk_indices),
));

Ok(self.codecs.clone().partial_encoder(
input_handle,
output_handle,
&chunk_representation,
options,
)?)
}
}
123 changes: 122 additions & 1 deletion zarrs/src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,19 @@ pub use byte_interval_partial_decoder::ByteIntervalPartialDecoder;
pub use byte_interval_partial_decoder::AsyncByteIntervalPartialDecoder;
use unsafe_cell_slice::UnsafeCellSlice;

mod array_partial_encoder_default;
pub use array_partial_encoder_default::ArrayPartialEncoderDefault;

mod array_to_array_partial_encoder_default;
pub use array_to_array_partial_encoder_default::ArrayToArrayPartialEncoderDefault;

mod bytes_partial_encoder_default;
pub use bytes_partial_encoder_default::BytesPartialEncoderDefault;

use crate::storage::{StoreKeyOffsetValue, WritableStorage};
use crate::{
array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError},
byte_range::{extract_byte_ranges_read_seek, ByteRange, InvalidByteRangeError},
byte_range::{extract_byte_ranges_read_seek, ByteOffset, ByteRange, InvalidByteRangeError},
metadata::v3::MetadataV3,
plugin::{Plugin, PluginCreateError},
storage::{ReadableStorage, StorageError, StoreKey},
Expand Down Expand Up @@ -380,6 +390,44 @@ pub trait ArrayPartialDecoderTraits: Send + Sync {
}
}

/// Partial array encoder traits.
pub trait ArrayPartialEncoderTraits: Send + Sync {
/// Erase the chunk.
///
/// # Errors
/// Returns an error if there is an underlying store error.
fn erase(&self) -> Result<(), CodecError>;

/// Partially encode a chunk.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
subsets_and_bytes: &[(&ArraySubset, ArrayBytes<'_>)],
options: &CodecOptions,
) -> Result<(), CodecError>;
}

/// Partial bytes encoder traits.
pub trait BytesPartialEncoderTraits: Send + Sync {
/// Erase the chunk.
///
/// # Errors
/// Returns an error if there is an underlying store error.
fn erase(&self) -> Result<(), CodecError>;

/// Partially encode a chunk.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
offsets_and_bytes: &[(ByteOffset, crate::array::RawBytes<'_>)],
options: &CodecOptions,
) -> Result<(), CodecError>;
}

#[cfg(feature = "async")]
/// Asynchronous partial array decoder traits.
#[async_trait::async_trait]
Expand Down Expand Up @@ -495,6 +543,37 @@ impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder {
}
}

/// A [`WritableStorage`] store value partial encoder.
pub struct StoragePartialEncoder {
storage: WritableStorage,
key: StoreKey,
}

impl StoragePartialEncoder {
/// Create a new storage partial encoder.
pub fn new(storage: WritableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}

impl BytesPartialEncoderTraits for StoragePartialEncoder {
fn erase(&self) -> Result<(), CodecError> {
Ok(self.storage.erase(&self.key)?)
}

fn partial_encode(
&self,
offsets_and_bytes: &[(ByteOffset, crate::array::RawBytes<'_>)],
_options: &CodecOptions,
) -> Result<(), CodecError> {
let key_offset_values = offsets_and_bytes
.iter()
.map(|(offset, bytes)| StoreKeyOffsetValue::new(self.key.clone(), *offset, bytes))
.collect::<Vec<_>>();
Ok(self.storage.set_partial_values(&key_offset_values)?)
}
}

/// Traits for array to array codecs.
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
Expand Down Expand Up @@ -554,6 +633,18 @@ pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn ArrayPartialDecoderTraits>,
output_handle: Arc<dyn ArrayPartialEncoderTraits>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -565,6 +656,8 @@ pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError>;

// TODO: async_partial_encoder
}

/// Traits for array to bytes codecs.
Expand Down Expand Up @@ -654,6 +747,18 @@ pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &ChunkRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -665,6 +770,8 @@ pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError>;

// TODO: Async partial encoder
}

/// Traits for bytes to bytes codecs.
Expand Down Expand Up @@ -720,6 +827,18 @@ pub trait BytesToBytesCodecTraits: CodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialises an asynchronous partial decoder.
///
Expand All @@ -731,6 +850,8 @@ pub trait BytesToBytesCodecTraits: CodecTraits + core::fmt::Debug {
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncBytesPartialDecoderTraits>, CodecError>;

// TODO: Async partial encoder
}

impl BytesPartialDecoderTraits for std::io::Cursor<&[u8]> {
Expand Down
Loading