Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema registry and migrator components #9710

Merged
merged 37 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ce8ef43
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
804eae1
feat: Add schema registry component for reusable services
kselveliev Nov 5, 2024
e00abba
feat: Add service migrator similar to services
kselveliev Nov 5, 2024
dda0c80
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
7b1d58d
Add dynamic service configuration
bilyana-gospodinova Nov 5, 2024
b67223f
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev Nov 5, 2024
dd72bd0
fix: Uncomment needed lines in schema registry after state changes
kselveliev Nov 5, 2024
c2702b7
fix: Refactor migrator and networkInfo for successful state initializ…
kselveliev Nov 6, 2024
5dded46
fix: Addressing review comments
kselveliev Nov 6, 2024
cceb9e5
fix: Addressing review comments
kselveliev Nov 8, 2024
b38d57d
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
f1e940e
Add dynamic service configuration
bilyana-gospodinova Nov 5, 2024
ea49673
Fix code smell
bilyana-gospodinova Nov 5, 2024
2b598bf
Increase code coverage
bilyana-gospodinova Nov 5, 2024
f0d9d34
sync with services more
bilyana-gospodinova Nov 8, 2024
cd96150
Update the state as in example
bilyana-gospodinova Nov 11, 2024
2fede4f
Fix code smells and improve coverage
bilyana-gospodinova Nov 11, 2024
afcf416
Fix code smells
bilyana-gospodinova Nov 11, 2024
b8ae0f0
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev Nov 11, 2024
afbf84a
Implement MirrorNodeState
bilyana-gospodinova Nov 4, 2024
90e20a1
Add dynamic service configuration
bilyana-gospodinova Nov 5, 2024
7917a71
Fix code smell
bilyana-gospodinova Nov 5, 2024
8c730bb
Increase code coverage
bilyana-gospodinova Nov 5, 2024
d7b3da2
sync with services more
bilyana-gospodinova Nov 8, 2024
8fe36b1
Update the state as in example
bilyana-gospodinova Nov 11, 2024
532e291
Fix code smells and improve coverage
bilyana-gospodinova Nov 11, 2024
5116cba
Fix code smells
bilyana-gospodinova Nov 11, 2024
49cdcfb
Fix PR comments
bilyana-gospodinova Nov 11, 2024
bdefa11
Improve coverage
bilyana-gospodinova Nov 11, 2024
fa57b07
Increase coverage
bilyana-gospodinova Nov 12, 2024
28a2912
Increase coverage (again)
bilyana-gospodinova Nov 12, 2024
b724cbb
Rename packages
bilyana-gospodinova Nov 12, 2024
18e6dd0
Merge branch '09259-state-implementation' into 09260-add-schema-regis…
kselveliev Nov 15, 2024
9715335
Merge branch 'main' into 09260-add-schema-registry-component
kselveliev Nov 15, 2024
7f32049
fix: Remove duplicated classes
kselveliev Nov 15, 2024
62d40fa
fix: Fix code smells
kselveliev Nov 15, 2024
7feac7d
fix: Fix code smells
kselveliev Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SelfNodeInfo selfNodeInfo() {
@Nonnull
@Override
public List<NodeInfo> addressBook() {
throw new UnsupportedOperationException("Address book is not supported.");
return List.of(mockSelfNodeInfo());
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state.components;

import static com.hedera.node.app.state.merkle.SchemaApplicationType.MIGRATION;
import static com.hedera.node.app.state.merkle.SchemaApplicationType.RESTART;
import static com.hedera.node.app.state.merkle.SchemaApplicationType.STATE_DEFINITIONS;

import com.hedera.hapi.node.base.SemanticVersion;
import com.hedera.mirror.web3.state.MirrorNodeState;
import com.hedera.mirror.web3.state.core.MapWritableStates;
import com.hedera.node.app.spi.state.FilteredReadableStates;
import com.hedera.node.app.spi.state.FilteredWritableStates;
import com.hedera.node.app.state.merkle.SchemaApplications;
import com.swirlds.config.api.Configuration;
import com.swirlds.config.api.ConfigurationBuilder;
import com.swirlds.state.spi.MigrationContext;
import com.swirlds.state.spi.ReadableStates;
import com.swirlds.state.spi.Schema;
import com.swirlds.state.spi.SchemaRegistry;
import com.swirlds.state.spi.WritableStates;
import com.swirlds.state.spi.info.NetworkInfo;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class SchemaRegistryImpl implements SchemaRegistry {
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved

public static final SemanticVersion CURRENT_VERSION = new SemanticVersion(0, 47, 0, "SNAPSHOT", "");

private final SchemaApplications schemaApplications;

/**
* The ordered set of all schemas registered by the service
*/
@Getter
private final SortedSet<Schema> schemas = new TreeSet<>();

@Override
public SchemaRegistry register(@Nonnull Schema schema) {
schemas.remove(schema);
schemas.add(schema);
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

@SuppressWarnings("rawtypes")
public void migrate(
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
@Nonnull final String serviceName,
@Nonnull final MirrorNodeState state,
@Nonnull final NetworkInfo networkInfo) {
migrate(
serviceName,
state,
CURRENT_VERSION,
networkInfo,
ConfigurationBuilder.create().build(),
new HashMap<>(),
new AtomicLong());
}

public void migrate(
@Nonnull final String serviceName,
@Nonnull final MirrorNodeState state,
@Nullable final SemanticVersion previousVersion,
@Nonnull final NetworkInfo networkInfo,
@Nonnull final Configuration config,
@Nonnull final Map<String, Object> sharedValues,
@Nonnull final AtomicLong nextEntityNum) {
if (schemas.isEmpty()) {
return;
}

// For each schema, create the underlying raw data sources (maps, or lists) and the writable states that
// will wrap them. Then call the schema's migrate method to populate those states, and commit each of them
// to the underlying data sources. At that point, we have properly migrated the state.
final var latestVersion = schemas.getLast().getVersion();

for (final var schema : schemas) {
final var applications =
schemaApplications.computeApplications(previousVersion, latestVersion, schema, config);
final var readableStates = state.getReadableStates(serviceName);
final var previousStates = new FilteredReadableStates(readableStates, readableStates.stateKeys());
final WritableStates writableStates;
final WritableStates newStates;
if (applications.contains(STATE_DEFINITIONS)) {
final var redefinedWritableStates = applyStateDefinitions(serviceName, schema, config, state);
writableStates = redefinedWritableStates.beforeStates();
newStates = redefinedWritableStates.afterStates();
} else {
newStates = writableStates = state.getWritableStates(serviceName);
}
final var context = newMigrationContext(
previousVersion, previousStates, newStates, config, networkInfo, nextEntityNum, sharedValues);
if (applications.contains(MIGRATION)) {
schema.migrate(context);
}
if (applications.contains(RESTART)) {
schema.restart(context);
}
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
if (writableStates instanceof MapWritableStates mws) {
mws.commit();
}

// And finally we can remove any states we need to remove
schema.statesToRemove().forEach(stateKey -> state.removeServiceState(serviceName, stateKey));
}
}

public MigrationContext newMigrationContext(
@Nullable final SemanticVersion previousVersion,
@Nonnull final ReadableStates previousStates,
@Nonnull final WritableStates writableStates,
@Nonnull final Configuration config,
@Nonnull final NetworkInfo networkInfo,
@Nonnull final AtomicLong nextEntityNum,
@Nonnull final Map<String, Object> sharedValues) {
return new MigrationContext() {
@Override
public void copyAndReleaseOnDiskState(String stateKey) {
// No-op
}

Check warning on line 146 in hedera-mirror-web3/src/main/java/com/hedera/mirror/web3/state/components/SchemaRegistryImpl.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-web3/src/main/java/com/hedera/mirror/web3/state/components/SchemaRegistryImpl.java#L146

Added line #L146 was not covered by tests

@Override
public SemanticVersion previousVersion() {
return previousVersion;
}

@Nonnull
@Override
public ReadableStates previousStates() {
return previousStates;
}

@Nonnull
@Override
public WritableStates newStates() {
return writableStates;
}

@Nonnull
@Override
public Configuration configuration() {
return config;
}

@Override
public NetworkInfo networkInfo() {
return networkInfo;
}

@Override
public long newEntityNum() {
return nextEntityNum.getAndIncrement();
}

@Override
public Map<String, Object> sharedValues() {
return sharedValues;
}
};
}

private RedefinedWritableStates applyStateDefinitions(
@Nonnull final String serviceName,
@Nonnull final Schema schema,
@Nonnull final Configuration configuration,
@Nonnull final MirrorNodeState state) {
final Map<String, Object> stateDataSources = new HashMap<>();
schema.statesToCreate(configuration).forEach(def -> {
if (def.singleton()) {
stateDataSources.put(def.stateKey(), new AtomicReference<>());
} else if (def.queue()) {
stateDataSources.put(def.stateKey(), new ConcurrentLinkedDeque<>());
} else {
stateDataSources.put(def.stateKey(), new ConcurrentHashMap<>());
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
}
});

state.addService(serviceName, stateDataSources);

final var statesToRemove = schema.statesToRemove();
final var writableStates = state.getWritableStates(serviceName);
final var remainingStates = new HashSet<>(writableStates.stateKeys());
remainingStates.removeAll(statesToRemove);
final var newStates = new FilteredWritableStates(writableStates, remainingStates);
return new RedefinedWritableStates(writableStates, newStates);
}

/**
* Encapsulates the writable states before and after applying a schema's state definitions.
*
* @param beforeStates the writable states before applying the schema's state definitions
* @param afterStates the writable states after applying the schema's state definitions
*/
private record RedefinedWritableStates(WritableStates beforeStates, WritableStates afterStates) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state.components;

import static java.util.Objects.requireNonNull;

import com.hedera.hapi.block.stream.output.StateChanges.Builder;
import com.hedera.hapi.node.base.SemanticVersion;
import com.hedera.mirror.web3.state.MirrorNodeState;
import com.hedera.node.app.services.ServiceMigrator;
import com.hedera.node.app.services.ServicesRegistry;
import com.hedera.node.config.data.HederaConfig;
import com.swirlds.config.api.Configuration;
import com.swirlds.metrics.api.Metrics;
import com.swirlds.platform.system.SoftwareVersion;
import com.swirlds.state.State;
import com.swirlds.state.spi.info.NetworkInfo;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.inject.Named;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

@Named
public class ServiceMigratorImpl implements ServiceMigrator {
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved

@Override
public List<Builder> doMigrations(
@Nonnull State state,
@Nonnull ServicesRegistry servicesRegistry,
@Nullable SoftwareVersion previousVersion,
@Nonnull SoftwareVersion currentVersion,
@Nonnull Configuration config,
@Nonnull NetworkInfo networkInfo,
@Nonnull Metrics metrics) {
requireNonNull(state);
requireNonNull(servicesRegistry);
requireNonNull(currentVersion);
requireNonNull(config);
requireNonNull(networkInfo);
requireNonNull(metrics);

if (!(state instanceof MirrorNodeState mirrorNodeState)) {
throw new IllegalArgumentException("Can only be used with MirrorNodeState instances");
}

if (!(servicesRegistry instanceof ServicesRegistryImpl registry)) {
throw new IllegalArgumentException("Can only be used with ServicesRegistryImpl instances");
}

final AtomicLong prevEntityNum =
new AtomicLong(config.getConfigData(HederaConfig.class).firstUserEntity() - 1);
final Map<String, Object> sharedValues = new HashMap<>();
final var deserializedPbjVersion = Optional.ofNullable(previousVersion)
.map(SoftwareVersion::getPbjSemanticVersion)
.orElse(null);

registry.registrations().stream().forEach(registration -> {
if (!(registration.registry() instanceof SchemaRegistryImpl schemaRegistry)) {
throw new IllegalArgumentException("Can only be used with SchemaRegistryImpl instances");
}
schemaRegistry.migrate(
registration.serviceName(),
mirrorNodeState,
deserializedPbjVersion,
networkInfo,
config,
sharedValues,
prevEntityNum);
});
return List.of();
}

@Nullable
@Override
public SemanticVersion creationVersionOf(@Nonnull State state) {
if (!(state instanceof MirrorNodeState)) {
throw new IllegalArgumentException("Can only be used with MirrorNodeState instances");
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.web3.state.components;

import com.hedera.node.app.services.ServicesRegistry;
import com.hedera.node.app.state.merkle.SchemaApplications;
import com.swirlds.state.spi.Service;
import jakarta.annotation.Nonnull;
import jakarta.inject.Named;
import java.util.Collections;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

@Named
public class ServicesRegistryImpl implements ServicesRegistry {

private final SortedSet<Registration> entries = new TreeSet<>();

@Nonnull
@Override
public Set<Registration> registrations() {
return Collections.unmodifiableSortedSet(entries);
}

@Override
public void register(@Nonnull Service service) {
final var registry = new SchemaRegistryImpl(new SchemaApplications());
service.registerSchemas(registry);
entries.add(new ServicesRegistryImpl.Registration(service, registry));
}

@Nonnull
@Override
public ServicesRegistry subRegistryFor(@Nonnull String... serviceNames) {
final var selections = Set.of(serviceNames);
final var subRegistry = new ServicesRegistryImpl();
subRegistry.entries.addAll(entries.stream()
.filter(registration -> selections.contains(registration.serviceName()))
.toList());
return subRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ void testSelfNodeInfo() {

@Test
void testAddressBook() {
final var exception = assertThrows(UnsupportedOperationException.class, () -> networkInfoImpl.addressBook());
assertThat(exception.getMessage()).isEqualTo("Address book is not supported.");
assertThat(networkInfoImpl.addressBook()).isNotEmpty();
}

@Test
Expand Down
Loading
Loading