Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added version control diff #1345

Merged
merged 24 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hub/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

DATASET_LOCK_FILENAME = "dataset_lock.lock"
TENSOR_COMMIT_CHUNK_SET_FILENAME = "chunk_set"
TENSOR_COMMIT_DIFF_FILENAME = "commit_diff"


DATASET_LOCK_UPDATE_INTERVAL = 120 # seconds
DATASET_LOCK_VALIDITY = 300 # seconds
Expand Down
42 changes: 40 additions & 2 deletions hub/core/chunk_engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import hub
import warnings
import numpy as np
from typing import Any, Dict, Optional, Sequence, Union, List
from hub.core.version_control.commit_diff import CommitDiff, get_sample_indexes_added
from hub.core.version_control.commit_node import CommitNode # type: ignore
from hub.core.version_control.commit_chunk_set import CommitChunkSet # type: ignore
from typing import Any, Dict, List, Optional, Sequence, Union
from hub.util.casting import intelligent_cast
from hub.constants import DEFAULT_MAX_CHUNK_SIZE, FIRST_COMMIT_ID
Expand All @@ -13,11 +17,11 @@
from hub.core.meta.encode.chunk_id import ChunkIdEncoder
from hub.core.meta.tensor_meta import TensorMeta
from hub.core.storage.lru_cache import LRUCache
from hub.core.version_control.commit_chunk_set import CommitChunkSet
from hub.core.version_control.commit_node import CommitNode
from hub.util.casting import get_dtype
from hub.util.keys import (
get_chunk_id_encoder_key,
get_tensor_commit_diff_key,
get_tensor_meta_key,
get_chunk_key,
get_tensor_commit_chunk_set_key,
get_tensor_meta_key,
Expand Down Expand Up @@ -187,6 +191,33 @@ def commit_chunk_set(self) -> Optional[CommitChunkSet]:
def commit_chunk_set_exists(self) -> bool:
return commit_chunk_set_exists(self.version_state, self.meta_cache, self.key)

@property
def commit_diff(self) -> CommitDiff:
"""Gets the commit diff from cache, if one is not found it creates a blank one.

Returns:
CommitDiff: The commit diff keeps track of all the changes in the current commit.
"""
commit_id = self.version_state["commit_id"]
key = get_tensor_commit_diff_key(self.key, commit_id)
if not self.commit_diff_exists:
diff = CommitDiff()
self.meta_cache[key] = diff
return diff

diff = self.meta_cache.get_cachable(key, CommitDiff)
return diff

@property
def commit_diff_exists(self) -> bool:
try:
commit_id = self.version_state["commit_id"]
key = get_tensor_commit_diff_key(self.key, commit_id)
self.meta_cache[key]
return True
except KeyError:
return False

@property
def chunk_id_encoder_exists(self) -> bool:
try:
Expand Down Expand Up @@ -302,6 +333,7 @@ def _sanitize_samples(self, samples):
def extend(self, samples):
self._write_initialization()
samples = self._sanitize_samples(samples)
indexes_added = get_sample_indexes_added(self.num_samples, samples)

tensor_meta = self.tensor_meta
if tensor_meta.dtype is None:
Expand Down Expand Up @@ -331,6 +363,8 @@ def extend(self, samples):
enc.register_samples(num_samples_added)
samples = samples[num_samples_added:]

self.commit_diff.add_data(indexes_added)

for chunk in updated_chunks:
self.cache[chunk.key] = chunk # type: ignore
self._write_finalization()
Expand Down Expand Up @@ -362,6 +396,9 @@ def _synchronize_cache(self, chunk_keys: List[str] = None):
chunk_id_key = get_chunk_id_encoder_key(self.key, commit_id)
self.meta_cache[chunk_id_key] = self.chunk_id_encoder

commit_diff_key = get_tensor_commit_diff_key(self.key, commit_id)
self.meta_cache[commit_diff_key] = self.commit_diff

# first commit doesn't have commit chunk set
if commit_id != FIRST_COMMIT_ID:
# synchronize current chunk set, all older ones are immutable
Expand Down Expand Up @@ -408,6 +445,7 @@ def update(
# tensor_meta.update_shape_interval(shape)
chunk.update_sample(local_sample_index, sample)
updated_chunks.add(chunk)
self.commit_diff.update_data(global_sample_index)

# only care about deltas if it isn't the last chunk
if chunk.key != self.last_chunk_key: # type: ignore
Expand Down
63 changes: 57 additions & 6 deletions hub/core/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import defaultdict
import pickle
import posixpath
import warnings
Expand Down Expand Up @@ -39,6 +40,13 @@
)
from hub.util.path import get_path_from_storage
from hub.util.remove_cache import get_base_storage
from hub.util.diff import (
compare_commits,
create_changes_dict,
get_all_changes_string,
filter_data_updated,
get_changes_for_id,
)
from hub.util.version_control import (
auto_checkout,
checkout,
Expand Down Expand Up @@ -309,7 +317,6 @@ def create_tensor(
ffw_dataset_meta(self.version_state["meta"])
self.storage.maybe_flush()
tensor = Tensor(name, self.storage, self.version_state) # type: ignore

self.version_state["full_tensors"][name] = tensor
tensor.info.update(info_kwargs)
return tensor
Expand Down Expand Up @@ -426,19 +433,63 @@ def checkout(self, address: str, create: bool = False) -> str:
def log(self):
"""Displays the details of all the past commits."""
commit_node = self.version_state["commit_node"]
logger.info("---------------\nHub Version Log\n---------------\n")
logger.info(f"Current Branch: {self.version_state['branch']}")
print("---------------\nHub Version Log\n---------------\n")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we comprehensively changing these emits or cherry picking as we go along?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have 2 functions that deliberately print out something, diff and log.
Realized that print was a better choice while implementing diff, changed log too for consistency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster setup (atm) will capture stdout emits as log. Strongly suggest removing any free form debugging logs, and strictly stick to f"<log-level> <log-msg> - <key-0>: <val-0> - [...] <key-n>: <val-n>" formatting of logs. For debug (check with @absinha18) it is possible that stderr is ignored and can be used for free-form debugging emits. We wants logs that we can systemically grep.

print(f"Current Branch: {self.version_state['branch']}")
if not commit_node.children and commit_has_data(
self.version_state, self.storage
):
logger.info("** There are uncommitted changes on this branch.\n")
print("** There are uncommitted changes on this branch.\n")
else:
logger.info("\n")
print()
while commit_node:
if commit_node.commit_time is not None:
logger.info(f"{commit_node}\n")
print(f"{commit_node}\n")
commit_node = commit_node.parent

def diff(self, id_1: Optional[str] = None, id_2: Optional[str] = None):
"""Displays the differences between commits/branches.

Args:
id_1 (str, optional): The first commit_id or branch name.
id_2 (str, optional): The second commit_id or branch name.

If both id_1 and id_2 are None, the differences between the current commit and the previous commit will be displayed.
If only id_1 is provided, the differences between the current commit and id_1 will be displayed.
If only id_2 is provided, a ValueError will be raised.
If both id_1 and id_2 are provided, the differences between id_1 and id_2 will be displayed.

Raises:
ValueError: If both id_1 is None and id_2 is not None.
"""
version_state, storage = self.version_state, self.storage
message1 = message2 = changes1 = changes2 = None

if id_1 is None and id_2 is None:
changes1 = create_changes_dict()
commit_id = version_state["commit_id"]
get_changes_for_id(commit_id, storage, changes1)
filter_data_updated(changes1)
message1 = f"Diff in {commit_id} (current commit):\n"
else:
if id_1 is None:
raise ValueError("Can't specify id_1 without specifying id_2")
elif id_2 is None:
commit1: str = version_state["commit_id"]
commit2 = id_1
message1 = f"Diff in {commit1} (current commit):\n"
message2 = f"Diff in {commit2} (target id):\n"
else:
commit1 = id_1
commit2 = id_2
message1 = f"Diff in {commit1} (target id 1):\n"
message2 = f"Diff in {commit2} (target id 2):\n"
changes1, changes2 = compare_commits(
commit1, commit2, version_state, storage
)

all_changes = get_all_changes_string(changes1, message1, changes2, message2)
print(all_changes)

def _populate_meta(self):
"""Populates the meta information for the dataset."""

Expand Down
13 changes: 11 additions & 2 deletions hub/core/tensor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from hub.core.version_control.commit_diff import CommitDiff
from hub.core.chunk.base_chunk import InputSample
from hub.util.version_control import checkout, generate_hash
import numpy as np
from typing import Dict, List, Sequence, Union, Optional, Tuple, Any
from functools import reduce
Expand All @@ -8,13 +8,18 @@
from hub.core.storage import StorageProvider, LRUCache
from hub.core.chunk_engine import ChunkEngine
from hub.api.info import load_info
from hub.util.keys import (
get_tensor_commit_diff_key,
get_tensor_meta_key,
tensor_exists,
get_tensor_info_key,
)
from hub.util.keys import get_tensor_meta_key, tensor_exists, get_tensor_info_key
from hub.util.shape_interval import ShapeInterval
from hub.util.exceptions import (
TensorDoesNotExistError,
InvalidKeyTypeError,
TensorAlreadyExistsError,
TensorDtypeMismatchError,
)


Expand Down Expand Up @@ -55,6 +60,10 @@ def create_tensor(
)
storage[meta_key] = meta # type: ignore

diff_key = get_tensor_commit_diff_key(key, version_state["commit_id"])
diff = CommitDiff(created=True)
storage[diff_key] = diff # type: ignore


def _inplace_op(f):
op = f.__name__
Expand Down
75 changes: 75 additions & 0 deletions hub/core/version_control/commit_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Set
from hub.core.storage.cachable import Cachable


class CommitDiff(Cachable):
"""Stores set of diffs stored for a particular tensor in a commit."""

def __init__(self, created=False) -> None:
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved
self.created = created
self.data_added: Set[int] = set()
self.data_updated: Set[int] = set()

def tobytes(self) -> bytes:
"""Returns bytes representation of the commit diff

The format stores the following information in order:
1. The first byte is a boolean value indicating whether the tensor was created in the commit or not.
2. The next 8 bytes are the number of elements in the data_added set, let's call this n.
3. The next 8 * n bytes are the elements of the data_added set.
4. The next 8 bytes are the number of elements in the data_updated set, let's call this m.
5. The next 8 * m bytes are the elements of the data_updated set.
"""
return b"".join(
[
self.created.to_bytes(1, "big"),
len(self.data_added).to_bytes(8, "big"),
*[idx.to_bytes(8, "big") for idx in self.data_added],
len(self.data_updated).to_bytes(8, "big"),
*[idx.to_bytes(8, "big") for idx in self.data_updated],
]
)

@classmethod
def frombuffer(cls, data: bytes) -> "CommitDiff":
AbhinavTuli marked this conversation as resolved.
Show resolved Hide resolved
"""Creates a CommitDiff object from bytes"""
commit_diff = cls()

commit_diff.created = bool(int.from_bytes(data[0:1], "big"))

added_ct = int.from_bytes(data[1:9], "big")
commit_diff.data_added = {
int.from_bytes(data[9 + i * 8 : 9 + (i + 1) * 8], "big")
for i in range(added_ct)
}

updated_ct = int.from_bytes(data[9 + added_ct * 8 : 17 + added_ct * 8], "big")
offset = 17 + added_ct * 8
commit_diff.data_updated = {
int.from_bytes(data[offset + i * 8 : offset + (i + 1) * 8], "big")
for i in range(updated_ct)
}

return commit_diff

@property
def nbytes(self):
"""Returns number of bytes required to store the commit diff"""
return 17 + (len(self.data_added) + len(self.data_updated)) * 8

def add_data(self, global_indexes: Set[int]) -> None:
"""Adds new indexes to data added"""
self.data_added.update(global_indexes)

def update_data(self, global_index: int) -> None:
"""Adds new indexes to data updated"""
if global_index not in self.data_added:
self.data_updated.add(global_index)


def get_sample_indexes_added(initial_num_samples: int, samples) -> Set[int]:
"""Returns a set of indexes added to the tensor"""
if initial_num_samples == 0:
return set(range(len(samples)))
else:
return set(range(initial_num_samples, initial_num_samples + len(samples)))
Loading