Skip to content

Commit

Permalink
Add partial encoding API
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 5, 2024
1 parent 7755c50 commit b428e5e
Show file tree
Hide file tree
Showing 28 changed files with 1,339 additions and 36 deletions.
1 change: 1 addition & 0 deletions zarrs/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub fn chunk_shape_to_array_shape(chunk_shape: &[std::num::NonZeroU64]) -> Array
/// - [`ReadableWritableStorageTraits`](crate::storage::ReadableWritableStorageTraits): store operations requiring reading *and* writing
/// - [`store_chunk_subset`](Array::store_chunk_subset)
/// - [`store_array_subset`](Array::store_array_subset)
/// - [`partial_encoder`](Array::partial_encoder)
///
/// Many `retrieve` and `store` methods have multiple variants:
/// - Standard variants store or retrieve data represented as [`ArrayBytes`] (representing fixed or variable length bytes).
Expand Down
95 changes: 79 additions & 16 deletions zarrs/src/array/array_async_readable_writable.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use zarrs_storage::StorageHandle;

use crate::{
array::ArrayBytes, array_subset::ArraySubset, storage::AsyncReadableWritableStorageTraits,
};

use super::{
array_bytes::update_array_bytes, codec::options::CodecOptions,
concurrency::concurrency_chunks_and_codec, Array, ArrayError, Element,
codec::{
options::CodecOptions, ArrayToBytesCodecTraits, AsyncArrayPartialEncoderTraits,
AsyncStoragePartialDecoder, AsyncStoragePartialEncoder,
},
concurrency::concurrency_chunks_and_codec,
Array, ArrayError, Element,
};

impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits + 'static> Array<TStorage> {
Expand Down Expand Up @@ -150,23 +157,30 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits + 'static> Array<TSto
// let mutex = self.storage.mutex(&key).await?;
// let _lock = mutex.lock();

// Decode the entire chunk
let chunk_bytes_old = self
.async_retrieve_chunk_opt(chunk_indices, options)
let partial_encoder = self
.async_partial_encoder_opt(chunk_indices, options)
.await?;
Ok(partial_encoder
.partial_encode(&[chunk_subset], vec![chunk_subset_bytes], options)
.await?)

// Update the chunk
let chunk_bytes_new = update_array_bytes(
chunk_bytes_old,
chunk_shape,
chunk_subset_bytes,
chunk_subset,
self.data_type().size(),
);
// // Decode the entire chunk
// let chunk_bytes_old = self
// .async_retrieve_chunk_opt(chunk_indices, options)
// .await?;

// Store the updated chunk
self.async_store_chunk_opt(chunk_indices, chunk_bytes_new, options)
.await
// // Update the chunk
// let chunk_bytes_new = super::array_bytes::update_array_bytes(
// chunk_bytes_old,
// chunk_shape,
// chunk_subset_bytes,
// chunk_subset,
// self.data_type().size(),
// );

// // Store the updated chunk
// self.async_store_chunk_opt(chunk_indices, chunk_bytes_new, options)
// .await
}
}

Expand Down Expand Up @@ -342,4 +356,53 @@ impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits + 'static> Array<TSto
self.async_store_array_subset_elements_opt(&subset, &subset_array, options)
.await
}

/// Initialises an asynchronous partial encoder for the chunk at `chunk_indices`.
///
/// # Errors
/// Returns an [`ArrayError`] if initialisation of the partial encoder fails.
pub async fn async_partial_encoder(
&self,
chunk_indices: &[u64],
) -> Result<Arc<dyn AsyncArrayPartialEncoderTraits>, ArrayError> {
self.async_partial_encoder_opt(chunk_indices, &CodecOptions::default())
.await
}

Check warning on line 370 in zarrs/src/array/array_async_readable_writable.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_async_readable_writable.rs#L364-L370

Added lines #L364 - L370 were not covered by tests

/// Explicit options version of [`async_partial_encoder`](Array::async_partial_encoder).
#[allow(clippy::missing_errors_doc)]
pub async fn async_partial_encoder_opt(
&self,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialEncoderTraits>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));

// Input
let storage_transformer_read = self
.storage_transformers()
.create_async_readable_transformer(storage_handle.clone())
.await?;
let input_handle = Arc::new(AsyncStoragePartialDecoder::new(
storage_transformer_read,
self.chunk_key(chunk_indices),
));
let chunk_representation = self.chunk_array_representation(chunk_indices)?;

// Output
let storage_transformer_write = self
.storage_transformers()
.create_async_writable_transformer(storage_handle)
.await?;
let output_handle = Arc::new(AsyncStoragePartialEncoder::new(
storage_transformer_write,
self.chunk_key(chunk_indices),
));

Ok(self
.codecs
.clone()
.async_partial_encoder(input_handle, output_handle, &chunk_representation, options)
.await?)
}
}
96 changes: 80 additions & 16 deletions zarrs/src/array/array_sync_readable_writable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::sync::Arc;

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{array::ArrayBytes, array_subset::ArraySubset, storage::ReadableWritableStorageTraits};
use crate::{
array::ArrayBytes,
array_subset::ArraySubset,
storage::{ReadableWritableStorageTraits, StorageHandle},
};

use super::{
array_bytes::update_array_bytes, codec::options::CodecOptions,
concurrency::concurrency_chunks_and_codec, Array, ArrayError, Element,
codec::{
options::CodecOptions, ArrayPartialEncoderTraits, ArrayToBytesCodecTraits,
StoragePartialDecoder, StoragePartialEncoder,
},
concurrency::concurrency_chunks_and_codec,
Array, ArrayError, Element,
};

impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage> {
Expand Down Expand Up @@ -180,21 +190,28 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
// let mutex = self.storage.mutex(&key)?;
// let _lock = mutex.lock();

// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;
let partial_encoder = self.partial_encoder_opt(chunk_indices, options)?;
Ok(partial_encoder.partial_encode(
&[chunk_subset],
vec![chunk_subset_bytes],
options,
)?)

// Update the chunk
let chunk_bytes_new = update_array_bytes(
chunk_bytes_old,
chunk_shape,
chunk_subset_bytes,
chunk_subset,
self.data_type().size(),
);
// // Decode the entire chunk
// let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
// chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;

// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
// // Update the chunk
// let chunk_bytes_new = update_array_bytes(
// chunk_bytes_old,
// chunk_shape,
// chunk_subset_bytes,
// chunk_subset,
// self.data_type().size(),
// );

// // Store the updated chunk
// self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
}
}

Expand Down Expand Up @@ -350,4 +367,51 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
let subset_array = super::ndarray_into_vec(subset_array);
self.store_array_subset_elements_opt(&subset, &subset_array, options)
}

/// Initialises a partial encoder for the chunk at `chunk_indices`.
///
/// # Errors
/// Returns an [`ArrayError`] if initialisation of the partial encoder fails.
pub fn partial_encoder(
&self,
chunk_indices: &[u64],
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, ArrayError> {
self.partial_encoder_opt(chunk_indices, &CodecOptions::default())
}

Check warning on line 380 in zarrs/src/array/array_sync_readable_writable.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_sync_readable_writable.rs#L375-L380

Added lines #L375 - L380 were not covered by tests

/// Explicit options version of [`partial_encoder`](Array::partial_encoder).
#[allow(clippy::missing_errors_doc)]
pub fn partial_encoder_opt(
&self,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));

// Input
let storage_transformer_read = self
.storage_transformers()
.create_readable_transformer(storage_handle.clone())?;
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer_read,
self.chunk_key(chunk_indices),
));
let chunk_representation = self.chunk_array_representation(chunk_indices)?;

// Output
let storage_transformer_write = self
.storage_transformers()
.create_writable_transformer(storage_handle)?;
let output_handle = Arc::new(StoragePartialEncoder::new(
storage_transformer_write,
self.chunk_key(chunk_indices),
));

Ok(self.codecs.clone().partial_encoder(
input_handle,
output_handle,
&chunk_representation,
options,
)?)
}
}
Loading

0 comments on commit b428e5e

Please sign in to comment.