Skip to content

Commit

Permalink
feat: Avoid inserting an empty leaf in indexed trees on update (#10334)
Browse files Browse the repository at this point in the history
Fix network kind tests
  • Loading branch information
sirasistant authored Dec 3, 2024
1 parent 24abcfe commit 80fad45
Show file tree
Hide file tree
Showing 16 changed files with 491 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,20 @@ class ContentAddressedIndexedTree : public ContentAddressedAppendOnlyTree<Store,
const InsertionGenerationCallback& completion);

struct InsertionUpdates {
// On insertion, we always update a low leaf. If it's creating a new leaf, we need to update the pointer to
// point to the new one, if it's an update to an existing leaf, we need to change its payload.
LeafUpdate low_leaf_update;
IndexedLeafValueType new_leaf;
index_t new_leaf_index;
// We don't create new leaves on update
std::optional<std::pair<IndexedLeafValueType, index_t>> new_leaf;
};

struct SequentialInsertionGenerationResponse {
std::shared_ptr<std::vector<InsertionUpdates>> updates_to_perform;
std::vector<InsertionUpdates> updates_to_perform;
index_t highest_index;
};

using SequentialInsertionGenerationCallback =
std::function<void(const TypedResponse<SequentialInsertionGenerationResponse>&)>;
std::function<void(TypedResponse<SequentialInsertionGenerationResponse>&)>;
void generate_sequential_insertions(const std::vector<LeafValueType>& values,
const SequentialInsertionGenerationCallback& completion);

Expand Down Expand Up @@ -1421,6 +1423,14 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::add_or_update_values_seq
const AddSequentiallyCompletionCallbackWithWitness& completion,
bool capture_witness)
{

// This struct is used to collect some state from the asynchronous operations we are about to perform
struct IntermediateResults {
std::vector<InsertionUpdates> updates_to_perform;
size_t appended_leaves = 0;
};
auto results = std::make_shared<IntermediateResults>();

auto on_error = [=](const std::string& message) {
try {
TypedResponse<AddIndexedDataSequentiallyResponse<LeafValueType>> response;
Expand All @@ -1442,7 +1452,7 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::add_or_update_values_seq
ReadTransactionPtr tx = store_->create_read_transaction();
store_->get_meta(meta, *tx, true);

index_t new_total_size = values.size() + meta.size;
index_t new_total_size = results->appended_leaves + meta.size;
meta.size = new_total_size;
meta.root = store_->get_current_root(*tx, true);

Expand All @@ -1451,23 +1461,29 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::add_or_update_values_seq

if (capture_witness) {
// Split results->update_witnesses between low_leaf_witness_data and insertion_witness_data
// Currently we always insert an empty leaf, even if it's an update, so we can split based
// on the index of the witness data
response.inner.insertion_witness_data =
std::make_shared<std::vector<LeafUpdateWitnessData<LeafValueType>>>();
;
response.inner.insertion_witness_data->reserve(results->updates_to_perform.size());

response.inner.low_leaf_witness_data =
std::make_shared<std::vector<LeafUpdateWitnessData<LeafValueType>>>();
;

for (size_t i = 0; i < updates_completion_response.inner.update_witnesses->size(); ++i) {
LeafUpdateWitnessData<LeafValueType>& witness_data =
updates_completion_response.inner.update_witnesses->at(i);
// If even, it's a low leaf, if odd, it's an insertion witness
if (i % 2 == 0) {
response.inner.low_leaf_witness_data->push_back(witness_data);
response.inner.low_leaf_witness_data->reserve(results->updates_to_perform.size());

size_t current_witness_index = 0;
for (size_t i = 0; i < results->updates_to_perform.size(); ++i) {
LeafUpdateWitnessData<LeafValueType> low_leaf_witness =
updates_completion_response.inner.update_witnesses->at(current_witness_index++);
response.inner.low_leaf_witness_data->push_back(low_leaf_witness);

// If this update has an insertion, append the real witness
if (results->updates_to_perform.at(i).new_leaf.has_value()) {
LeafUpdateWitnessData<LeafValueType> insertion_witness =
updates_completion_response.inner.update_witnesses->at(current_witness_index++);
response.inner.insertion_witness_data->push_back(insertion_witness);
} else {
response.inner.insertion_witness_data->push_back(witness_data);
// If it's an update, append an empty witness
response.inner.insertion_witness_data->push_back(LeafUpdateWitnessData<LeafValueType>(
IndexedLeafValueType::empty(), 0, std::vector<fr>(depth_)));
}
}
}
Expand All @@ -1479,23 +1495,33 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::add_or_update_values_seq
// This signals the completion of the insertion data generation
// Here we'll perform all updates to the tree
SequentialInsertionGenerationCallback insertion_generation_completed =
[=, this](const TypedResponse<SequentialInsertionGenerationResponse>& insertion_response) {
[=, this](TypedResponse<SequentialInsertionGenerationResponse>& insertion_response) {
if (!insertion_response.success) {
on_error(insertion_response.message);
return;
}

std::shared_ptr<std::vector<LeafUpdate>> flat_updates = std::make_shared<std::vector<LeafUpdate>>();
for (size_t i = 0; i < insertion_response.inner.updates_to_perform->size(); ++i) {
InsertionUpdates& insertion_update = insertion_response.inner.updates_to_perform->at(i);
flat_updates->reserve(insertion_response.inner.updates_to_perform.size() * 2);

for (size_t i = 0; i < insertion_response.inner.updates_to_perform.size(); ++i) {
InsertionUpdates& insertion_update = insertion_response.inner.updates_to_perform.at(i);
flat_updates->push_back(insertion_update.low_leaf_update);
flat_updates->push_back(LeafUpdate{
.leaf_index = insertion_update.new_leaf_index,
.updated_leaf = insertion_update.new_leaf,
.original_leaf = IndexedLeafValueType::empty(),
});
if (insertion_update.new_leaf.has_value()) {
results->appended_leaves++;
IndexedLeafValueType new_leaf;
index_t new_leaf_index = 0;
std::tie(new_leaf, new_leaf_index) = insertion_update.new_leaf.value();
flat_updates->push_back(LeafUpdate{
.leaf_index = new_leaf_index,
.updated_leaf = new_leaf,
.original_leaf = IndexedLeafValueType::empty(),
});
}
}

// We won't use anymore updates_to_perform
results->updates_to_perform = std::move(insertion_response.inner.updates_to_perform);
assert(insertion_response.inner.updates_to_perform.size() == 0);
if (capture_witness) {
perform_updates(flat_updates->size(), flat_updates, final_completion);
return;
Expand All @@ -1513,27 +1539,12 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::generate_sequential_inse
{
execute_and_report<SequentialInsertionGenerationResponse>(
[=, this](TypedResponse<SequentialInsertionGenerationResponse>& response) {
response.inner.highest_index = 0;
response.inner.updates_to_perform = std::make_shared<std::vector<InsertionUpdates>>();

TreeMeta meta;
ReadTransactionPtr tx = store_->create_read_transaction();
store_->get_meta(meta, *tx, true);

RequestContext requestContext;
requestContext.includeUncommitted = true;
// Ensure that the tree is not going to be overfilled
index_t new_total_size = values.size() + meta.size;
if (new_total_size > max_size_) {
throw std::runtime_error(format("Unable to insert values into tree ",
meta.name,
" new size: ",
new_total_size,
" max size: ",
max_size_));
}
// The highest index touched will be the last leaf index, since we append a zero for updates
response.inner.highest_index = new_total_size - 1;
requestContext.root = store_->get_current_root(*tx, true);
// Fetch the frontier (non empty nodes to the right) of the tree. This will ensure that perform_updates or
// perform_updates_without_witness has all the cached nodes it needs to perform the insertions. See comment
Expand All @@ -1542,12 +1553,15 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::generate_sequential_inse
find_leaf_hash(meta.size - 1, requestContext, *tx, true);
}

index_t current_size = meta.size;

for (size_t i = 0; i < values.size(); ++i) {
const LeafValueType& new_payload = values[i];
// TODO(Alvaro) - Rethink this. I think it's fine for us to interpret empty values as a regular update
// (it'd empty out the payload of the zero leaf)
if (new_payload.is_empty()) {
continue;
}
index_t index_of_new_leaf = i + meta.size;
fr value = new_payload.get_key();

// This gives us the leaf that need updating
Expand Down Expand Up @@ -1594,25 +1608,25 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::generate_sequential_inse
.updated_leaf = IndexedLeafValueType::empty(),
.original_leaf = low_leaf,
},
.new_leaf = IndexedLeafValueType::empty(),
.new_leaf_index = index_of_new_leaf,
.new_leaf = std::nullopt,
};

if (!is_already_present) {
// Update the current leaf to point it to the new leaf
IndexedLeafValueType new_leaf =
IndexedLeafValueType(new_payload, low_leaf.nextIndex, low_leaf.nextValue);

index_t index_of_new_leaf = current_size;
low_leaf.nextIndex = index_of_new_leaf;
low_leaf.nextValue = value;
current_size++;
// Cache the new leaf
store_->set_leaf_key_at_index(index_of_new_leaf, new_leaf);
store_->put_cached_leaf_by_index(index_of_new_leaf, new_leaf);
// Update cached low leaf
store_->put_cached_leaf_by_index(low_leaf_index, low_leaf);

insertion_update.low_leaf_update.updated_leaf = low_leaf;
insertion_update.new_leaf = new_leaf;
insertion_update.new_leaf = std::pair(new_leaf, index_of_new_leaf);
} else if (IndexedLeafValueType::is_updateable()) {
// Update the current leaf's value, don't change it's link
IndexedLeafValueType replacement_leaf =
Expand All @@ -1630,8 +1644,20 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::generate_sequential_inse
" is already present"));
}

response.inner.updates_to_perform->push_back(insertion_update);
response.inner.updates_to_perform.push_back(insertion_update);
}

// Ensure that the tree is not going to be overfilled
if (current_size > max_size_) {
throw std::runtime_error(format("Unable to insert values into tree ",
meta.name,
" new size: ",
current_size,
" max size: ",
max_size_));
}
// The highest index touched will be current_size - 1
response.inner.highest_index = current_size - 1;
},
completion);
}
Expand Down
Loading

0 comments on commit 80fad45

Please sign in to comment.