diff --git a/hub/api/tests/test_api_tiling.py b/hub/api/tests/test_api_tiling.py index e80ad8c31f..6205d05cd4 100644 --- a/hub/api/tests/test_api_tiling.py +++ b/hub/api/tests/test_api_tiling.py @@ -27,7 +27,7 @@ def test_mixed_small_large(local_ds_generator, compression): arr2 = np.random.randint(0, 255, (500, 500, 3)).astype(np.uint8) arr3 = np.random.randint(0, 255, (2503, 2501, 3)).astype(np.uint8) with ds: - ds.create_tensor("abc", **compression) + ds.create_tensor("abc", max_chunk_size=2 ** 21, **compression) for i in range(10): if i % 5 == 0: ds.abc.append(arr1) @@ -75,7 +75,7 @@ def test_updates(memory_ds, compression): arr3 = np.random.randint(0, 255, (2503, 2501, 3)).astype(np.uint8) arr4 = np.random.randint(0, 255, (250, 250, 3)).astype(np.uint8) with memory_ds: - memory_ds.create_tensor("abc", **compression) + memory_ds.create_tensor("abc", max_chunk_size=2 ** 21, **compression) for i in range(10): if i % 5 == 0: memory_ds.abc.append(arr1) diff --git a/hub/core/chunk/base_chunk.py b/hub/core/chunk/base_chunk.py index 0392527155..a3fb44d98a 100644 --- a/hub/core/chunk/base_chunk.py +++ b/hub/core/chunk/base_chunk.py @@ -51,9 +51,8 @@ def __init__( encoded_byte_positions: Optional[np.ndarray] = None, data: Optional[memoryview] = None, ): + self._data_bytes: Union[bytearray, bytes, memoryview] = data or bytearray() self.version = hub.__version__ - - self.data_bytes: Union[bytearray, bytes, memoryview] = data or bytearray() self.min_chunk_size = min_chunk_size self.max_chunk_size = max_chunk_size @@ -74,8 +73,16 @@ def __init__( raise ValueError("Can't use image compression with text data.") # These caches are only used for ChunkCompressed chunk. - self._decompressed_samples: Optional[List[np.ndarray]] = None - self._decompressed_bytes: Optional[bytes] = None + self.decompressed_samples: Optional[List[np.ndarray]] = None + self.decompressed_bytes: Optional[bytes] = None + + @property + def data_bytes(self) -> Union[bytearray, bytes, memoryview]: + return self._data_bytes + + @data_bytes.setter + def data_bytes(self, value: Union[bytearray, bytes, memoryview]): + self._data_bytes = value @property def num_data_bytes(self) -> int: @@ -282,12 +289,11 @@ def normalize_shape(self, shape): shape = (1,) return shape - def write_tile(self, sample: SampleTiles, skip_bytes=False): + def write_tile(self, sample: SampleTiles): data, tile_shape = sample.yield_tile() - sample_nbytes = None if skip_bytes else len(data) self.data_bytes = data update_meta = sample.is_first_write - self.register_sample_to_headers(sample_nbytes, tile_shape) + self.register_sample_to_headers(None, tile_shape) if update_meta: self.tensor_meta.length += 1 self.tensor_meta.update_shape_interval(sample.sample_shape) diff --git a/hub/core/chunk/chunk_compressed_chunk.py b/hub/core/chunk/chunk_compressed_chunk.py index f2294f3eec..a1ecf1a069 100644 --- a/hub/core/chunk/chunk_compressed_chunk.py +++ b/hub/core/chunk/chunk_compressed_chunk.py @@ -6,118 +6,141 @@ decompress_bytes, decompress_multiple, ) +from hub.core.sample import Sample # type: ignore +from hub.core.fast_forwarding import ffw_chunk from hub.core.serialize import bytes_to_text, check_sample_shape from hub.core.tiling.sample_tiles import SampleTiles from hub.util.casting import intelligent_cast from hub.util.compression import get_compression_ratio from hub.util.exceptions import SampleDecompressionError from .base_chunk import BaseChunk, InputSample +from hub.core.serialize import infer_chunk_num_bytes import hub class ChunkCompressedChunk(BaseChunk): + def __init__(self, *args, **kwargs): + super(ChunkCompressedChunk, self).__init__(*args, **kwargs) + if self.is_byte_compression: + self.decompressed_bytes = bytearray( + decompress_bytes(self._data_bytes, self.compression) + ) + else: + shapes = [ + self.shapes_encoder[i] for i in range(self.shapes_encoder.num_samples) + ] + self.decompressed_samples = decompress_multiple(self._data_bytes, shapes) + self._changed = False + self._compression_ratio = 0.5 + def extend_if_has_space(self, incoming_samples: List[InputSample]) -> float: + self.prepare_for_write() if self.is_byte_compression: return self.extend_if_has_space_byte_compression(incoming_samples) - return self.extend_if_has_space_img_compression(incoming_samples) + return self.extend_if_has_space_image_compression(incoming_samples) def extend_if_has_space_byte_compression(self, incoming_samples: List[InputSample]): - num_samples: float = 0 - buffer = bytearray(self.decompressed_bytes) if self.data_bytes else bytearray() + num_samples = 0 for i, incoming_sample in enumerate(incoming_samples): serialized_sample, shape = self.serialize_sample( incoming_sample, chunk_compression=self.compression, store_uncompressed_tiles=True, ) + self.num_dims = self.num_dims or len(shape) check_sample_shape(shape, self.num_dims) if isinstance(serialized_sample, SampleTiles): - incoming_samples[i] = serialized_sample + incoming_samples[i] = serialized_sample # type: ignore if self.is_empty: self.write_tile(serialized_sample) - num_samples += 0.5 + num_samples += 0.5 # type: ignore tile = serialized_sample.yield_uncompressed_tile() - self._decompressed_bytes = tile.tobytes() + self.decompressed_bytes = tile.tobytes() + self._changed = True break - else: - sample_nbytes = len(serialized_sample) - buffer += serialized_sample - compressed_bytes = compress_bytes(buffer, self.compression) - if self.is_empty or len(compressed_bytes) < self.min_chunk_size: - self.data_bytes = compressed_bytes - self.register_in_meta_and_headers(sample_nbytes, shape) - num_samples += 1 - self._decompressed_bytes = buffer - else: + sample_nbytes = len(serialized_sample) + + recompressed = False # This flag helps avoid double concatenation + if ( + len(self.decompressed_bytes) + sample_nbytes # type: ignore + ) * self._compression_ratio > self.min_chunk_size: + new_decompressed = self.decompressed_bytes + serialized_sample # type: ignore + compressed_bytes = compress_bytes( + new_decompressed, compression=self.compression + ) + if len(compressed_bytes) > self.min_chunk_size: break + recompressed = True + self.decompressed_bytes = new_decompressed + self._compression_ratio /= 2 + self._data_bytes = compressed_bytes + self._changed = False + if not recompressed: + self.decompressed_bytes += serialized_sample # type: ignore + self._changed = True + self.register_in_meta_and_headers(sample_nbytes, shape) + num_samples += 1 return num_samples - def extend_if_has_space_img_compression(self, incoming_samples: List[InputSample]): - num_samples: float = 0 - buffer_list = self.decompressed_samples if self.data_bytes else [] - + def extend_if_has_space_image_compression( + self, incoming_samples: List[InputSample] + ): + num_samples = 0 + num_decompressed_bytes = sum( + x.nbytes for x in self.decompressed_samples # type: ignore + ) for i, incoming_sample in enumerate(incoming_samples): incoming_sample, shape = self.process_sample_img_compr(incoming_sample) if isinstance(incoming_sample, SampleTiles): - incoming_samples[i] = incoming_sample + incoming_samples[i] = incoming_sample # type: ignore if self.is_empty: - self.write_tile(incoming_sample, skip_bytes=True) - num_samples += 0.5 + self.write_tile(incoming_sample) + num_samples += 0.5 # type: ignore tile = incoming_sample.yield_uncompressed_tile() - self._decompressed_samples = [tile] + self.decompressed_samples = [tile] + self._changed = True break + if ( + num_decompressed_bytes + incoming_sample.nbytes + ) * self._compression_ratio > self.min_chunk_size: + compressed_bytes = compress_multiple( + self.decompressed_samples + [incoming_sample], # type: ignore + compression=self.compression, + ) + if len(compressed_bytes) > self.min_chunk_size: + break + self._compression_ratio /= 2 + self._data_bytes = compressed_bytes + self._changed = False - buffer_list.append(incoming_sample) - compressed_bytes = compress_multiple(buffer_list, self.compression) - if self.is_empty or len(compressed_bytes) < self.min_chunk_size: - self.data_bytes = compressed_bytes - # Byte positions aren't relevant for chunk wise img compressions - self.register_in_meta_and_headers(None, shape) - num_samples += 1 - self._decompressed_samples = buffer_list - else: - buffer_list.pop() - break + shape = incoming_sample.shape + shape = self.normalize_shape(shape) + self.num_dims = self.num_dims or len(shape) + check_sample_shape(shape, self.num_dims) + self.decompressed_samples.append(incoming_sample) # type: ignore + self._changed = True + # Byte positions are not relevant for chunk wise image compressions, so incoming_num_bytes=None. + self.register_in_meta_and_headers(None, shape) + num_samples += 1 return num_samples - @property - def decompressed_samples(self) -> List[np.ndarray]: - """Applicable only for compressed chunks. Returns samples contained in this chunk as a list of numpy arrays.""" - if self._decompressed_samples is None: - num_samples = self.shapes_encoder.num_samples - shapes = [self.shapes_encoder[i] for i in range(num_samples)] - self._decompressed_samples = decompress_multiple(self.data_bytes, shapes) - return self._decompressed_samples - - @property - def decompressed_bytes(self) -> bytes: - """Applicable only for chunks compressed using a byte compression. Returns the contents of the chunk as a decompressed buffer.""" - if self._decompressed_bytes is None: - try: - data = decompress_bytes(self.data_bytes, self.compression) - self._decompressed_bytes = data - except SampleDecompressionError: - raise ValueError( - "Chunk.decompressed_bytes can not be called on chunks compressed with image compressions. Use Chunk.get_samples() instead." - ) - return self._decompressed_bytes - def read_sample(self, local_index: int, cast: bool = True, copy: bool = False): if self.is_image_compression: - return self.decompressed_samples[local_index] + return self.decompressed_samples[local_index] # type: ignore - sb, eb = self.byte_positions_encoder[local_index] shape = self.shapes_encoder[local_index] - decompressed = memoryview(self.decompressed_bytes) - buffer = decompressed[sb:eb] + decompressed = memoryview(self.decompressed_bytes) # type: ignore + if not self.byte_positions_encoder.is_empty(): + sb, eb = self.byte_positions_encoder[local_index] + decompressed = decompressed[sb:eb] if self.is_text_like: - return bytes_to_text(buffer, self.htype) - return np.frombuffer(decompressed[sb:eb], dtype=self.dtype).reshape(shape) + return bytes_to_text(decompressed, self.htype) + return np.frombuffer(decompressed, dtype=self.dtype).reshape(shape) def update_sample(self, local_index: int, new_sample: InputSample): self.prepare_for_write() @@ -132,16 +155,16 @@ def update_sample_byte_compression(self, local_index: int, new_sample: InputSamp ) self.check_shape_for_update(local_index, shape) - new_nb = len(serialized_sample) decompressed_buffer = self.decompressed_bytes new_data_uncompressed = self.create_updated_data( local_index, decompressed_buffer, serialized_sample ) - self.data_bytes = bytearray( - compress_bytes(new_data_uncompressed, self.compression) + self.decompressed_bytes = new_data_uncompressed + self._changed = True + new_nb = ( + None if self.byte_positions_encoder.is_empty() else len(serialized_sample) ) - self._decompressed_bytes = new_data_uncompressed self.update_in_meta_and_headers(local_index, new_nb, shape) def update_sample_img_compression(self, local_index: int, new_sample: InputSample): @@ -150,9 +173,14 @@ def update_sample_img_compression(self, local_index: int, new_sample: InputSampl shape = self.normalize_shape(shape) self.check_shape_for_update(local_index, shape) decompressed_samples = self.decompressed_samples - decompressed_samples[local_index] = new_sample - self.data_bytes = bytearray( - compress_multiple(decompressed_samples, self.compression) + + decompressed_samples[local_index] = new_sample # type: ignore + self._changed = True + self.update_in_meta_and_headers(local_index, None, shape) # type: ignore + + decompressed_samples[local_index] = new_sample # type: ignore + self.data_bytes = bytearray( # type: ignore + compress_multiple(decompressed_samples, self.compression) # type: ignore ) self.update_in_meta_and_headers(local_index, None, shape) @@ -179,3 +207,41 @@ def process_sample_img_compr(self, sample): ) return sample, shape + + def _compress(self): + if self.is_byte_compression: + self._data_bytes = compress_bytes(self.decompressed_bytes, self.compression) + else: + self._data_bytes = compress_multiple( + self.decompressed_samples, self.compression + ) + + @property + def data_bytes(self): + if self._changed: + self._compress() + self._changed = False + return self._data_bytes + + @data_bytes.setter + def data_bytes(self, value): + self._data_bytes = value + + @property + def num_uncompressed_bytes(self): + if self.is_byte_compression: + return len(self.decompressed_bytes) + return sum(x.nbytes for x in self.decompressed_samples) + + @property + def nbytes(self): + """Calculates the number of bytes `tobytes` will be without having to call `tobytes`. Used by `LRUCache` to determine if this chunk can be cached.""" + return infer_chunk_num_bytes( + self.version, + self.shapes_encoder.array, + self.byte_positions_encoder.array, + len_data=min(self.num_uncompressed_bytes, self.max_chunk_size), + ) + + def prepare_for_write(self): + ffw_chunk(self) diff --git a/hub/core/chunk/sample_compressed_chunk.py b/hub/core/chunk/sample_compressed_chunk.py index 4defaa0902..fc2f558ec8 100644 --- a/hub/core/chunk/sample_compressed_chunk.py +++ b/hub/core/chunk/sample_compressed_chunk.py @@ -45,8 +45,10 @@ def extend_if_has_space(self, incoming_samples: List[InputSample]) -> float: return num_samples def read_sample(self, local_index: int, cast: bool = True, copy: bool = False): - sb, eb = self.byte_positions_encoder[local_index] - buffer = self.memoryview_data[sb:eb] + buffer = self.memoryview_data + if not self.byte_positions_encoder.is_empty(): + sb, eb = self.byte_positions_encoder[local_index] + buffer = buffer[sb:eb] shape = self.shapes_encoder[local_index] if self.is_text_like: buffer = decompress_bytes(buffer, compression=self.compression) @@ -65,11 +67,13 @@ def update_sample(self, local_index: int, sample: InputSample): ) self.check_shape_for_update(local_index, shape) - new_nb = len(serialized_sample) old_data = self.data_bytes self.data_bytes = self.create_updated_data( local_index, old_data, serialized_sample ) # update encoders and meta + new_nb = ( + None if self.byte_positions_encoder.is_empty() else len(serialized_sample) + ) self.update_in_meta_and_headers(local_index, new_nb, shape) diff --git a/hub/core/chunk/test_chunk_compressed.py b/hub/core/chunk/test_chunk_compressed.py index 09e848c2b5..3a5d433c93 100644 --- a/hub/core/chunk/test_chunk_compressed.py +++ b/hub/core/chunk/test_chunk_compressed.py @@ -1,4 +1,4 @@ -from hub.constants import MB, PARTIAL_NUM_SAMPLES +from hub.constants import MB, KB, PARTIAL_NUM_SAMPLES from hub.core.chunk.chunk_compressed_chunk import ChunkCompressedChunk import numpy as np import pytest @@ -22,7 +22,7 @@ def create_tensor_meta(): tensor_meta.dtype = "uint8" tensor_meta.max_shape = None tensor_meta.min_shape = None - tensor_meta.htype = None + tensor_meta.htype = "generic" tensor_meta.length = 0 return tensor_meta @@ -48,7 +48,8 @@ def test_read_write_sequence(compression): @compressions_paremetrized -def test_read_write_sequence_big(cat_path, compression): +@pytest.mark.parametrize("random", [True, False]) +def test_read_write_sequence_big(cat_path, compression, random): tensor_meta = create_tensor_meta() common_args["tensor_meta"] = tensor_meta common_args["compression"] = compression @@ -57,20 +58,26 @@ def test_read_write_sequence_big(cat_path, compression): for i in range(50): if i % 10 == 0: data_in.append( - np.random.randint(0, 255, size=(6001, 3000, 3)).astype(dtype) + np.random.randint(0, 255, size=(6001, 3000, 3)).astype(dtype) * random ) elif i % 3 == 0: - data_in.append(hub.read(cat_path)) + data_in.append( + hub.read(cat_path) if random else np.zeros((900, 900, 3), dtype=dtype) + ) else: - data_in.append(np.random.randint(0, 255, size=(1000, 500, 3)).astype(dtype)) + data_in.append( + np.random.randint(0, 255, size=(1000, 500, 3)).astype(dtype) * random + ) data_in2 = data_in.copy() tiles = [] original_length = len(data_in) - + tiled = False while data_in: chunk = ChunkCompressedChunk(**common_args) + chunk._compression_ratio = 10 # start with a bad compression ratio to hit exponential back off code path num_samples = chunk.extend_if_has_space(data_in) if num_samples == PARTIAL_NUM_SAMPLES: + tiled = True tiles.append(chunk.read_sample(0)) sample = data_in[0] assert isinstance(sample, SampleTiles) @@ -95,6 +102,7 @@ def test_read_write_sequence_big(cat_path, compression): item = item.array np.testing.assert_array_equal(item, data_in[i]) data_in = data_in[num_samples:] + assert tiled @compressions_paremetrized diff --git a/hub/core/chunk/uncompressed_chunk.py b/hub/core/chunk/uncompressed_chunk.py index 57e4ab2c8e..3e75c1b8dd 100644 --- a/hub/core/chunk/uncompressed_chunk.py +++ b/hub/core/chunk/uncompressed_chunk.py @@ -68,8 +68,10 @@ def _extend_if_has_space_list(self, incoming_samples: List[InputSample]) -> floa return num_samples def read_sample(self, local_index: int, cast: bool = True, copy: bool = False): - sb, eb = self.byte_positions_encoder[local_index] - buffer = self.memoryview_data[sb:eb] + buffer = self.memoryview_data + if not self.byte_positions_encoder.is_empty(): + sb, eb = self.byte_positions_encoder[local_index] + buffer = buffer[sb:eb] shape = self.shapes_encoder[local_index] if self.is_text_like: buffer = bytes(buffer) @@ -81,7 +83,9 @@ def update_sample(self, local_index: int, sample: InputSample): self.prepare_for_write() serialized_sample, shape = self.serialize_sample(sample, break_into_tiles=False) self.check_shape_for_update(local_index, shape) - new_nb = len(serialized_sample) + new_nb = ( + None if self.byte_positions_encoder.is_empty() else len(serialized_sample) + ) old_data = self.data_bytes self.data_bytes = self.create_updated_data( diff --git a/hub/core/compression.py b/hub/core/compression.py index b71276e57d..0c331d605d 100644 --- a/hub/core/compression.py +++ b/hub/core/compression.py @@ -372,6 +372,8 @@ def decompress_multiple( compression: Optional[str] = None, ) -> List[np.ndarray]: """Unpack a compressed buffer into multiple arrays.""" + if not buffer: + return [] if compression and get_compression_type(compression) == "byte": decompressed_buffer = memoryview(decompress_bytes(buffer, compression)) arrays = [] diff --git a/hub/core/meta/encode/base_encoder.py b/hub/core/meta/encode/base_encoder.py index 4c8dc9df3e..60199eb9c6 100644 --- a/hub/core/meta/encode/base_encoder.py +++ b/hub/core/meta/encode/base_encoder.py @@ -675,3 +675,6 @@ def _try_splitting_middle( self._encoded = np.concatenate((start, [new_row], end)) return True + + def is_empty(self) -> bool: + return len(self._encoded) == 0