Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema registry and migrator components #9710

Merged
merged 37 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ce8ef43
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
804eae1
feat: Add schema registry component for reusable services
kselveliev Nov 5, 2024
e00abba
feat: Add service migrator similar to services
kselveliev Nov 5, 2024
dda0c80
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
7b1d58d
Add dynamic service configuration
bilyana-gospodinova Nov 5, 2024
b67223f
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev Nov 5, 2024
dd72bd0
fix: Uncomment needed lines in schema registry after state changes
kselveliev Nov 5, 2024
c2702b7
fix: Refactor migrator and networkInfo for successful state initializ…
kselveliev Nov 6, 2024
5dded46
fix: Addressing review comments
kselveliev Nov 6, 2024
cceb9e5
fix: Addressing review comments
kselveliev Nov 8, 2024
b38d57d
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
f1e940e
Add dynamic service configuration
bilyana-gospodinova Nov 5, 2024
ea49673
Fix code smell
bilyana-gospodinova Nov 5, 2024
2b598bf
Increase code coverage
bilyana-gospodinova Nov 5, 2024
f0d9d34
sync with services more
bilyana-gospodinova Nov 8, 2024
cd96150
Update the state as in example
bilyana-gospodinova Nov 11, 2024
2fede4f
Fix code smells and improve coverage
bilyana-gospodinova Nov 11, 2024
afcf416
Fix code smells
bilyana-gospodinova Nov 11, 2024
b8ae0f0
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev Nov 11, 2024
afbf84a
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
90e20a1
Add dynamic service configuration
bilyana-gospodinova Nov 5, 2024
7917a71
Fix code smell
bilyana-gospodinova Nov 5, 2024
8c730bb
Increase code coverage
bilyana-gospodinova Nov 5, 2024
d7b3da2
sync with services more
bilyana-gospodinova Nov 8, 2024
8fe36b1
Update the state as in example
bilyana-gospodinova Nov 11, 2024
532e291
Fix code smells and improve coverage
bilyana-gospodinova Nov 11, 2024
5116cba
Fix code smells
bilyana-gospodinova Nov 11, 2024
49cdcfb
Fix PR comments
bilyana-gospodinova Nov 11, 2024
bdefa11
Improve coverage
bilyana-gospodinova Nov 11, 2024
fa57b07
Increase coverage
bilyana-gospodinova Nov 12, 2024
28a2912
Increase coverage (again)
bilyana-gospodinova Nov 12, 2024
b724cbb
Rename packages
bilyana-gospodinova Nov 12, 2024
18e6dd0
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev Nov 15, 2024
9715335
Merge branch 'main' into 09260-add-schema-registry-component
kselveliev Nov 15, 2024
7f32049
fix: Remove duplicated classes
kselveliev Nov 15, 2024
62d40fa
fix: Fix code smells
kselveliev Nov 15, 2024
7feac7d
fix: Fix code smells
kselveliev Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state;

import static java.util.Objects.requireNonNull;

import com.hedera.mirror.web3.state.utils.MapReadableKVState;
import com.hedera.mirror.web3.state.utils.MapReadableStates;
import com.hedera.mirror.web3.state.utils.MapWritableKVState;
import com.hedera.mirror.web3.state.utils.MapWritableStates;
import com.swirlds.state.State;
import com.swirlds.state.spi.EmptyReadableStates;
import com.swirlds.state.spi.EmptyWritableStates;
import com.swirlds.state.spi.ReadableStates;
import com.swirlds.state.spi.WritableStates;
import edu.umd.cs.findbugs.annotations.NonNull;
import jakarta.annotation.Nonnull;
import jakarta.inject.Named;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings({"rawtypes", "unchecked"})
@Named
public class MirrorNodeState implements State {

private final Map<String, ReadableStates> readableStates = new ConcurrentHashMap<>();
// Key is Service, value is Map of state name to state datasource
private final Map<String, Map<String, Object>> states = new ConcurrentHashMap<>();

public MirrorNodeState addService(@NonNull final String serviceName, @NonNull final Map<String, ?> dataSources) {
final var serviceStates = this.states.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());
dataSources.forEach((k, b) -> {
if (!serviceStates.containsKey(k)) {
serviceStates.put(k, b);
}
});

// Purge any readable states whose state definitions are now stale,
// since they don't include the new data sources we just added
readableStates.remove(serviceName);
return this;
}

/**
* Removes the state with the given key for the service with the given name.
*
* @param serviceName the name of the service
* @param stateKey the key of the state
*/
public void removeServiceState(@NonNull final String serviceName, @NonNull final String stateKey) {
requireNonNull(serviceName);
requireNonNull(stateKey);
this.states.computeIfPresent(serviceName, (k, v) -> {
v.remove(stateKey);
readableStates.remove(serviceName); // Remove the service so that its states will be repopulated.
return v;
});
}

@Nonnull
@Override
public ReadableStates getReadableStates(@Nonnull String serviceName) {
return readableStates.computeIfAbsent(serviceName, s -> {
final var serviceStates = this.states.get(s);
if (serviceStates == null) {
return new EmptyReadableStates();
}
final Map<String, Object> states = new ConcurrentHashMap<>();
for (final var entry : serviceStates.entrySet()) {
final var stateName = entry.getKey();
final var state = entry.getValue();
if (state instanceof Map map) {
states.put(stateName, new MapReadableKVState(stateName, map));
}
}
return new MapReadableStates(states);
});
}

@Nonnull
@Override
public WritableStates getWritableStates(@Nonnull String serviceName) {
final var serviceStates = states.get(serviceName);
if (serviceStates == null) {
return new EmptyWritableStates();
}

final Map<String, Object> data = new ConcurrentHashMap<>();
for (final var entry : serviceStates.entrySet()) {
final var stateName = entry.getKey();
final var state = entry.getValue();
if (state instanceof Map) {
final var readableState = getReadableStates(serviceName).get(stateName);
data.put(stateName, new MapWritableKVState<>(stateName, readableState));
}
}
return new MapWritableStates(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SelfNodeInfo selfNodeInfo() {
@Nonnull
@Override
public List<NodeInfo> addressBook() {
throw new UnsupportedOperationException("Address book is not supported.");
return List.of(mockSelfNodeInfo());
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state.components;

import static com.hedera.node.app.state.merkle.SchemaApplicationType.MIGRATION;
import static com.hedera.node.app.state.merkle.SchemaApplicationType.RESTART;
import static com.hedera.node.app.state.merkle.SchemaApplicationType.STATE_DEFINITIONS;

import com.hedera.hapi.node.base.SemanticVersion;
import com.hedera.mirror.web3.state.MirrorNodeState;
import com.hedera.mirror.web3.state.utils.MapWritableStates;
import com.hedera.node.app.spi.state.FilteredReadableStates;
import com.hedera.node.app.spi.state.FilteredWritableStates;
import com.hedera.node.app.state.merkle.SchemaApplications;
import com.swirlds.config.api.Configuration;
import com.swirlds.config.api.ConfigurationBuilder;
import com.swirlds.state.spi.MigrationContext;
import com.swirlds.state.spi.ReadableStates;
import com.swirlds.state.spi.Schema;
import com.swirlds.state.spi.SchemaRegistry;
import com.swirlds.state.spi.WritableStates;
import com.swirlds.state.spi.info.NetworkInfo;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class SchemaRegistryImpl implements SchemaRegistry {
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved

public static final SemanticVersion CURRENT_VERSION = new SemanticVersion(0, 47, 0, "SNAPSHOT", "");

private final SchemaApplications schemaApplications;

/**
* The ordered set of all schemas registered by the service
*/
@Getter
private final SortedSet<Schema> schemas = new TreeSet<>();

@Override
public SchemaRegistry register(@Nonnull Schema schema) {
schemas.add(schema);
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

@SuppressWarnings("rawtypes")
public void migrate(
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull final String serviceName,
@Nonnull final MirrorNodeState state,
@Nonnull final NetworkInfo networkInfo) {
migrate(
serviceName,
state,
CURRENT_VERSION,
networkInfo,
ConfigurationBuilder.create().build(),
new HashMap<>(),
new AtomicLong());
}

public void migrate(
@Nonnull final String serviceName,
@Nonnull final MirrorNodeState state,
@Nullable final SemanticVersion previousVersion,
@Nonnull final NetworkInfo networkInfo,
@Nonnull final Configuration config,
@Nonnull final Map<String, Object> sharedValues,
@Nonnull final AtomicLong nextEntityNum) {
if (schemas.isEmpty()) {
return;
}

// For each schema, create the underlying raw data sources (maps, or lists) and the writable states that
// will wrap them. Then call the schema's migrate method to populate those states, and commit each of them
// to the underlying data sources. At that point, we have properly migrated the state.
final var latestVersion = schemas.getLast().getVersion();

for (final var schema : schemas) {
final var applications =
schemaApplications.computeApplications(previousVersion, latestVersion, schema, config);
final var readableStates = state.getReadableStates(serviceName);
final var previousStates = new FilteredReadableStates(readableStates, readableStates.stateKeys());
final WritableStates writableStates;
final WritableStates newStates;
if (applications.contains(STATE_DEFINITIONS)) {
final var redefinedWritableStates = applyStateDefinitions(serviceName, schema, config, state);
writableStates = redefinedWritableStates.beforeStates();
newStates = redefinedWritableStates.afterStates();
} else {
newStates = writableStates = state.getWritableStates(serviceName);
}
final var context = newMigrationContext(
previousVersion, previousStates, newStates, config, networkInfo, nextEntityNum, sharedValues);
if (applications.contains(MIGRATION)) {
schema.migrate(context);
}
if (applications.contains(RESTART)) {
schema.restart(context);
}
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
if (writableStates instanceof MapWritableStates mws) {
mws.commit();
}

// And finally we can remove any states we need to remove
schema.statesToRemove().forEach(stateKey -> state.removeServiceState(serviceName, stateKey));
}
}

public MigrationContext newMigrationContext(
@Nullable final SemanticVersion previousVersion,
@Nonnull final ReadableStates previousStates,
@Nonnull final WritableStates writableStates,
@Nonnull final Configuration config,
@Nonnull final NetworkInfo networkInfo,
@Nonnull final AtomicLong nextEntityNum,
@Nonnull final Map<String, Object> sharedValues) {
return new MigrationContext() {
@Override
public void copyAndReleaseOnDiskState(String stateKey) {
// No-op
}

@Override
public SemanticVersion previousVersion() {
return previousVersion;
}

@Nonnull
@Override
public ReadableStates previousStates() {
return previousStates;
}

@Nonnull
@Override
public WritableStates newStates() {
return writableStates;
}

@Nonnull
@Override
public Configuration configuration() {
return config;
}

@Override
public NetworkInfo networkInfo() {
return networkInfo;
}

@Override
public long newEntityNum() {
return nextEntityNum.getAndIncrement();
}

@Override
public Map<String, Object> sharedValues() {
return sharedValues;
}
};
}

private RedefinedWritableStates applyStateDefinitions(
@Nonnull final String serviceName,
@Nonnull final Schema schema,
@Nonnull final Configuration configuration,
@Nonnull final MirrorNodeState state) {
final Map<String, Object> stateDataSources = new HashMap<>();
schema.statesToCreate(configuration).forEach(def -> {
if (def.singleton()) {
stateDataSources.put(def.stateKey(), new AtomicReference<>());
} else if (def.queue()) {
stateDataSources.put(def.stateKey(), new ConcurrentLinkedDeque<>());
} else {
stateDataSources.put(def.stateKey(), new ConcurrentHashMap<>());
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
}
});

state.addService(serviceName, stateDataSources);

final var statesToRemove = schema.statesToRemove();
final var writableStates = state.getWritableStates(serviceName);
final var remainingStates = new HashSet<>(writableStates.stateKeys());
remainingStates.removeAll(statesToRemove);
final var newStates = new FilteredWritableStates(writableStates, remainingStates);
return new RedefinedWritableStates(writableStates, newStates);
}

/**
* Encapsulates the writable states before and after applying a schema's state definitions.
*
* @param beforeStates the writable states before applying the schema's state definitions
* @param afterStates the writable states after applying the schema's state definitions
*/
private record RedefinedWritableStates(WritableStates beforeStates, WritableStates afterStates) {}
}
Loading