Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AL-1574] Fast chunk compression #1366

Merged
merged 29 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e87bc8c
predict len(compressed_bytes)
farizrahman4u Dec 1, 2021
28cd01b
fix
farizrahman4u Dec 1, 2021
a72dbbe
compress on flush
farizrahman4u Dec 1, 2021
8767933
exp back off
farizrahman4u Dec 1, 2021
d70bc24
merge confl fix
farizrahman4u Dec 1, 2021
664868c
merge confl fix
farizrahman4u Dec 1, 2021
5323604
fixes
farizrahman4u Dec 1, 2021
bd60838
Merge branch 'main' of https://www.github.com/activeloopai/hub into f…
farizrahman4u Dec 5, 2021
3899b88
format
farizrahman4u Dec 5, 2021
012fcdf
smol fixes
farizrahman4u Dec 5, 2021
d239eb0
fixes
farizrahman4u Dec 5, 2021
b67e887
smol fix
farizrahman4u Dec 5, 2021
9d8cac4
some fixes
farizrahman4u Dec 6, 2021
e471b1a
typofix
farizrahman4u Dec 7, 2021
3cf4faf
Merge branch 'main' of https://www.github.com/activeloopai/hub into f…
farizrahman4u Dec 13, 2021
bc7177f
cleanup
farizrahman4u Dec 13, 2021
4d8638c
mypy
farizrahman4u Dec 13, 2021
9abfd1c
Merge branch 'main' of https://www.github.com/activeloopai/hub into f…
farizrahman4u Dec 15, 2021
fa4ce15
cov
farizrahman4u Dec 15, 2021
6e7f9c8
fix sample compression too
farizrahman4u Dec 15, 2021
3735d87
cov
farizrahman4u Dec 15, 2021
c85421a
cov
farizrahman4u Dec 15, 2021
85f11ee
cov fix
farizrahman4u Dec 15, 2021
3f9abcb
Merge branch 'main' into fr_fast_chunk_compression
farizrahman4u Dec 15, 2021
129c2f1
updates
farizrahman4u Dec 16, 2021
b895ebd
Merge branch 'fr_fast_chunk_compression' of https://www.github.com/ac…
farizrahman4u Dec 16, 2021
612334f
is_tile
farizrahman4u Dec 16, 2021
1ad913d
rem is_tile
farizrahman4u Dec 16, 2021
831b7a4
typoo
farizrahman4u Dec 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hub/api/tests/test_api_tiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_mixed_small_large(local_ds_generator, compression):
arr2 = np.random.randint(0, 255, (500, 500, 3)).astype(np.uint8)
arr3 = np.random.randint(0, 255, (2503, 2501, 3)).astype(np.uint8)
with ds:
ds.create_tensor("abc", **compression)
ds.create_tensor("abc", max_chunk_size=2 ** 21, **compression)
for i in range(10):
if i % 5 == 0:
ds.abc.append(arr1)
Expand Down Expand Up @@ -75,7 +75,7 @@ def test_updates(memory_ds, compression):
arr3 = np.random.randint(0, 255, (2503, 2501, 3)).astype(np.uint8)
arr4 = np.random.randint(0, 255, (250, 250, 3)).astype(np.uint8)
with memory_ds:
memory_ds.create_tensor("abc", **compression)
memory_ds.create_tensor("abc", max_chunk_size=2 ** 21, **compression)
for i in range(10):
if i % 5 == 0:
memory_ds.abc.append(arr1)
Expand Down
15 changes: 11 additions & 4 deletions hub/core/chunk/base_chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ def __init__(
encoded_byte_positions: Optional[np.ndarray] = None,
data: Optional[memoryview] = None,
):
self._data_bytes: Union[bytearray, bytes, memoryview] = data or bytearray()
self.version = hub.__version__

self.data_bytes: Union[bytearray, bytes, memoryview] = data or bytearray()
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size

Expand All @@ -73,8 +72,16 @@ def __init__(
raise ValueError("Can't use image compression with text data.")

# These caches are only used for ChunkCompressed chunk.
self._decompressed_samples: Optional[List[np.ndarray]] = None
self._decompressed_bytes: Optional[bytes] = None
self.decompressed_samples: Optional[List[np.ndarray]] = None
self.decompressed_bytes: Optional[bytes] = None

@property
def data_bytes(self) -> Union[bytearray, bytes, memoryview]:
return self._data_bytes

@data_bytes.setter
def data_bytes(self, value: Union[bytearray, bytes, memoryview]):
self._data_bytes = value

@property
def num_data_bytes(self) -> int:
Expand Down
220 changes: 179 additions & 41 deletions hub/core/chunk/chunk_compressed_chunk.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,148 @@
import numpy as np
from typing import List
from typing import List, Union, Sequence
from hub.core.compression import (
compress_bytes,
compress_multiple,
decompress_bytes,
decompress_multiple,
)
from hub.core.sample import Sample # type: ignore
from hub.core.fast_forwarding import ffw_chunk
from hub.core.serialize import bytes_to_text, check_sample_shape
from hub.core.tiling.sample_tiles import SampleTiles
from hub.util.casting import intelligent_cast
from hub.util.compression import get_compression_ratio
from hub.util.exceptions import SampleDecompressionError
from .base_chunk import BaseChunk, InputSample
from hub.core.serialize import infer_chunk_num_bytes


class ChunkCompressedChunk(BaseChunk):
def __init__(self, *args, **kwargs):
super(ChunkCompressedChunk, self).__init__(*args, **kwargs)
if self.is_byte_compression:
self.decompressed_bytes = bytearray(
decompress_bytes(self._data_bytes, self.compression)
)
else:
shapes = [
self.shapes_encoder[i] for i in range(self.shapes_encoder.num_samples)
]
self.decompressed_samples = decompress_multiple(self._data_bytes, shapes)
self._changed = False
self._compression_ratio = 2
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved

def extend_if_has_space(self, incoming_samples: List[InputSample]) -> float:

self.prepare_for_write()
if self.is_byte_compression:
return self.extend_if_has_space_byte_compression(incoming_samples)
return self.extend_if_has_space_img_compression(incoming_samples)
return self.extend_if_has_space_image_compression(incoming_samples)

def extend_if_has_space_byte_compression(
self, incoming_samples: Union[Sequence[InputSample], np.ndarray]
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
):
num_samples = 0
for i, incoming_sample in enumerate(incoming_samples):
serialized_sample, shape = self.serialize_sample(
incoming_sample,
chunk_compression=self.compression,
store_uncompressed_tiles=True,
)

self.num_dims = self.num_dims or len(shape)
check_sample_shape(shape, self.num_dims)

if isinstance(serialized_sample, SampleTiles):
incoming_samples[i] = serialized_sample # type: ignore
if self.is_empty:
self.write_tile(serialized_sample)
num_samples += 0.5 # type: ignore
tile = serialized_sample.yield_uncompressed_tile()
self.decompressed_bytes = tile.tobytes()
self._changed = True
break

sample_nbytes = len(serialized_sample)

recompressed = False # This flag helps avoid double concatenation
if (
len(self.decompressed_bytes) + sample_nbytes # type: ignore
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
) / self._compression_ratio > self.min_chunk_size:
new_decompressed = self.decompressed_bytes + serialized_sample # type: ignore
compressed_bytes = compress_bytes(
new_decompressed, compression=self.compression
)
if len(compressed_bytes) > self.min_chunk_size:
break
recompressed = True
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
self.decompressed_bytes = new_decompressed
self._compression_ratio *= 2
self._data_bytes = compressed_bytes
self._changed = False

if not recompressed:
self.decompressed_bytes += serialized_sample # type: ignore
self._changed = True
self.register_in_meta_and_headers(sample_nbytes, shape)
num_samples += 1
return num_samples

def extend_if_has_space_image_compression(
self, incoming_samples: Union[Sequence[InputSample], np.ndarray]
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
):
num_samples = 0
num_decompressed_bytes = sum(
x.nbytes for x in self.decompressed_samples # type: ignore
) # TODO cache this
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
for i, incoming_sample in enumerate(incoming_samples):
if isinstance(incoming_sample, bytes):
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"Chunkwise image compression is not applicable on bytes."
)
incoming_sample, shape = self.process_sample_img_compr(incoming_sample)

if isinstance(incoming_sample, SampleTiles):
incoming_samples[i] = incoming_sample # type: ignore
if self.is_empty:
self.write_tile(incoming_sample, skip_bytes=True)
num_samples += 0.5 # type: ignore
tile = incoming_sample.yield_uncompressed_tile()
self.decompressed_samples = [tile]
self._changed = True
break
if isinstance(incoming_sample, Sample):
incoming_sample = incoming_sample.array
if (
num_decompressed_bytes + incoming_sample.nbytes
) / self._compression_ratio > self.min_chunk_size:
compressed_bytes = compress_multiple(
self.decompressed_samples + [incoming_sample], # type: ignore
compression=self.compression,
)
if len(compressed_bytes) > self.min_chunk_size:
break
self._compression_ratio *= 2
self._data_bytes = compressed_bytes
self._changed = False

shape = incoming_sample.shape
shape = self.normalize_shape(shape)

def extend_if_has_space_byte_compression(self, incoming_samples: List[InputSample]):
self.num_dims = self.num_dims or len(shape)
check_sample_shape(shape, self.num_dims)
self.decompressed_samples.append(incoming_sample) # type: ignore
self._changed = True
# Byte positions are not relevant for chunk wise image compressions, so incoming_num_bytes=None.
self.register_in_meta_and_headers(None, shape)
num_samples += 1
return num_samples

def _extend_if_has_space_byte_compression(
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
self, incoming_samples: List[InputSample]
):
num_samples: float = 0
buffer = bytearray(self.decompressed_bytes) if self.data_bytes else bytearray()
buffer = bytearray(self.decompressed_bytes) if self.data_bytes else bytearray() # type: ignore
for i, incoming_sample in enumerate(incoming_samples):
serialized_sample, shape = self.serialize_sample(
incoming_sample,
Expand All @@ -46,15 +165,15 @@ def extend_if_has_space_byte_compression(self, incoming_samples: List[InputSampl
buffer += serialized_sample
compressed_bytes = compress_bytes(buffer, self.compression)
if self.is_empty or len(compressed_bytes) < self.min_chunk_size:
self.data_bytes = compressed_bytes
self.data_bytes = compressed_bytes # type: ignore
self.register_in_meta_and_headers(sample_nbytes, shape)
num_samples += 1
self._decompressed_bytes = buffer
else:
break
return num_samples

def extend_if_has_space_img_compression(self, incoming_samples: List[InputSample]):
def _extend_if_has_space_img_compression(self, incoming_samples: List[InputSample]):
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
num_samples: float = 0
buffer_list = self.decompressed_samples if self.data_bytes else []

Expand All @@ -70,49 +189,27 @@ def extend_if_has_space_img_compression(self, incoming_samples: List[InputSample
self._decompressed_samples = [tile]
break

buffer_list.append(incoming_sample)
compressed_bytes = compress_multiple(buffer_list, self.compression)
buffer_list.append(incoming_sample) # type: ignore
compressed_bytes = compress_multiple(buffer_list, self.compression) # type: ignore
if self.is_empty or len(compressed_bytes) < self.min_chunk_size:
self.data_bytes = compressed_bytes
self.data_bytes = compressed_bytes # type: ignore
# Byte positions aren't relevant for chunk wise img compressions
self.register_in_meta_and_headers(None, shape)
num_samples += 1
self._decompressed_samples = buffer_list
self._decompressed_samples = buffer_list # type: ignore
else:
buffer_list.pop()
buffer_list.pop() # type: ignore
break

return num_samples

@property
def decompressed_samples(self) -> List[np.ndarray]:
"""Applicable only for compressed chunks. Returns samples contained in this chunk as a list of numpy arrays."""
if self._decompressed_samples is None:
num_samples = self.shapes_encoder.num_samples
shapes = [self.shapes_encoder[i] for i in range(num_samples)]
self._decompressed_samples = decompress_multiple(self.data_bytes, shapes)
return self._decompressed_samples

@property
def decompressed_bytes(self) -> bytes:
"""Applicable only for chunks compressed using a byte compression. Returns the contents of the chunk as a decompressed buffer."""
if self._decompressed_bytes is None:
try:
data = decompress_bytes(self.data_bytes, self.compression)
self._decompressed_bytes = data
except SampleDecompressionError:
raise ValueError(
"Chunk.decompressed_bytes can not be called on chunks compressed with image compressions. Use Chunk.get_samples() instead."
)
return self._decompressed_bytes

def read_sample(self, local_index: int, cast: bool = True, copy: bool = False):
if self.is_image_compression:
return self.decompressed_samples[local_index]

sb, eb = self.byte_positions_encoder[local_index]
shape = self.shapes_encoder[local_index]
decompressed = memoryview(self.decompressed_bytes)
decompressed = memoryview(self.decompressed_bytes) # type: ignore
buffer = decompressed[sb:eb]
if self.is_text_like:
return bytes_to_text(buffer, self.htype)
Expand All @@ -137,10 +234,8 @@ def update_sample_byte_compression(self, local_index: int, new_sample: InputSamp
new_data_uncompressed = self.create_updated_data(
local_index, decompressed_buffer, serialized_sample
)
self.data_bytes = bytearray(
compress_bytes(new_data_uncompressed, self.compression)
)
self._decompressed_bytes = new_data_uncompressed
self.decompressed_bytes = new_data_uncompressed
self._changed = True
self.update_in_meta_and_headers(local_index, new_nb, shape)

def update_sample_img_compression(self, local_index: int, new_sample: InputSample):
Expand All @@ -149,9 +244,14 @@ def update_sample_img_compression(self, local_index: int, new_sample: InputSampl
shape = self.normalize_shape(shape)
self.check_shape_for_update(local_index, shape)
decompressed_samples = self.decompressed_samples
decompressed_samples[local_index] = new_sample
self.data_bytes = bytearray(
compress_multiple(decompressed_samples, self.compression)

decompressed_samples[local_index] = new_sample # type: ignore
self._changed = True
self.update_in_meta_and_headers(local_index, None, shape) # type: ignore

decompressed_samples[local_index] = new_sample # type: ignore
self.data_bytes = bytearray( # type: ignore
compress_multiple(decompressed_samples, self.compression) # type: ignore
)
self.update_in_meta_and_headers(local_index, None, shape)

Expand All @@ -177,3 +277,41 @@ def process_sample_img_compr(self, sample):
)

return sample, shape

def _compress(self):
if self.is_byte_compression:
self._data_bytes = compress_bytes(self.decompressed_bytes, self.compression)
else:
self._data_bytes = compress_multiple(
self.decompressed_samples, self.compression
)

@property
def data_bytes(self):
if self._changed:
self._compress()
self._changed = False
return self._data_bytes

@data_bytes.setter
def data_bytes(self, value):
self._data_bytes = value

@property
def num_uncompressed_bytes(self):
if self.is_byte_compression:
return len(self.decompressed_bytes)
return sum(x.nbytes for x in self.decompressed_samples)

@property
def nbytes(self):
"""Calculates the number of bytes `tobytes` will be without having to call `tobytes`. Used by `LRUCache` to determine if this chunk can be cached."""
return infer_chunk_num_bytes(
self.version,
self.shapes_encoder.array,
self.byte_positions_encoder.array,
len_data=min(self.num_uncompressed_bytes // 4, 100),
farizrahman4u marked this conversation as resolved.
Show resolved Hide resolved
)

def prepare_for_write(self):
ffw_chunk(self)
2 changes: 2 additions & 0 deletions hub/core/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ def decompress_multiple(
compression: Optional[str] = None,
) -> List[np.ndarray]:
"""Unpack a compressed buffer into multiple arrays."""
if not buffer:
return []
if compression and get_compression_type(compression) == "byte":
decompressed_buffer = memoryview(decompress_bytes(buffer, compression))
arrays = []
Expand Down