From 7c5e0248ba16ceaf0b7235f3a94f0388cfe5997a Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Thu, 6 Jun 2024 21:26:08 +0530 Subject: [PATCH] MINOR: Cleanup the storage module unit tests (#16202) - Use SystemTime instead of MockTime when time is not mocked - Use static assertions to reduce the line length - Fold the lines if it exceeds the limit - rename tp0 to tpId0 when it refers to TopicIdPartition Reviewers: Kuan-Po (Cooper) Tseng , Chia-Ping Tsai --- .../RemoteLogMetadataFormatterTest.java | 18 ++-- .../storage/RemoteLogMetadataSerdeTest.java | 38 ++++----- .../RemoteLogMetadataTransformTest.java | 40 ++++----- ...sedRemoteLogMetadataManagerConfigTest.java | 53 ++++-------- ...adataManagerMultipleSubscriptionsTest.java | 41 ++++----- ...edRemoteLogMetadataManagerRestartTest.java | 25 +++--- ...opicBasedRemoteLogMetadataManagerTest.java | 56 +++++++------ .../storage/RemoteLogMetadataManagerTest.java | 83 ++++++++++--------- 8 files changed, 169 insertions(+), 185 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index e3d1a2aee0cd3..1380a735fba89 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; @@ -35,6 +34,7 @@ import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED; public class RemoteLogMetadataFormatterTest { private static final Uuid TOPIC_ID = Uuid.randomUuid(); @@ -51,12 +51,12 @@ public void testFormat() throws IOException { RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID); Optional customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( - remoteLogSegmentId, 0L, 100L, -1L, 1, - 123L, 1024, customMetadata, - RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs); + remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, + segLeaderEpochs); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); - ConsumerRecord metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes); + ConsumerRecord metadataRecord = new ConsumerRecord<>( + "__remote_log_metadata", 0, 0, null, metadataBytes); String expected = String.format( "partition: 0, offset: 0, value: " + @@ -68,9 +68,11 @@ public void testFormat() throws IOException { TOPIC_ID, SEGMENT_ID); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos)) { - RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new RemoteLogMetadataSerde.RemoteLogMetadataFormatter(); - formatter.writeTo(metadataRecord, ps); - assertEquals(expected, baos.toString()); + try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = + new RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) { + formatter.writeTo(metadataRecord, ps); + assertEquals(expected, baos.toString()); + } } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java index 5b48790c7fdc9..b1b91dacf2379 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java @@ -19,48 +19,48 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + public class RemoteLogMetadataSerdeTest { public static final String TOPIC = "foo"; private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0)); - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); @Test public void testRemoteLogSegmentMetadataSerde() { RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata(); - doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata); } @Test public void testRemoteLogSegmentMetadataUpdateSerde() { RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate(); - doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate); } @Test public void testRemotePartitionDeleteMetadataSerde() { RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata(); - doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata); } @@ -70,24 +70,19 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { segLeaderEpochs.put(1, 20L); segLeaderEpochs.put(2, 80L); RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); - return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, - time.milliseconds(), 1024, - Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_STARTED, - segLeaderEpochs - ); + return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, + time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), + COPY_SEGMENT_STARTED, segLeaderEpochs); } private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() { RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(), - Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2); + Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 2); } private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() { - return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED, - time.milliseconds(), 0); + return new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_MARKED, time.milliseconds(), 0); } private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { @@ -96,16 +91,17 @@ private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) { byte[] metadataBytes = serializer.serialize(remoteLogMetadata); // Deserialize the bytes and check the RemoteLogMetadata object is as expected. - // Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances. + // Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and + // deserializer having their own instances. RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes); - Assertions.assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata); + assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata); } @Test public void testInvalidRemoteStorageMetadata() { // Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes. - Assertions.assertThrows(IllegalArgumentException.class, + assertThrows(IllegalArgumentException.class, () -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds()))); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java index 504f47e17a584..70770542eb911 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform; @@ -29,60 +29,56 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; -import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.Optional; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class RemoteLogMetadataTransformTest { private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); @Test public void testRemoteLogSegmentMetadataTransform() { RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform(); - RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata(); ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata); RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform .fromApiMessageAndVersion(apiMessageAndVersion); - - Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord); + assertEquals(metadata, remoteLogSegmentMetadataFromRecord); } @Test public void testRemoteLogSegmentMetadataUpdateTransform() { RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform(); - - RemoteLogSegmentMetadataUpdate metadataUpdate = - new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(), - Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1); + RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate( + new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(), + Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 1); ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate); - RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion); - - Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord); + RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = + metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion); + assertEquals(metadataUpdate, metadataUpdateFromRecord); } private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() { RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1, - time.milliseconds(), 1024, Collections.singletonMap(0, 0L)); + time.milliseconds(), 1024, Collections.singletonMap(0, 0L)); } @Test public void testRemoteLogPartitionMetadataTransform() { RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform(); - RemotePartitionDeleteMetadata partitionDeleteMetadata - = new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1); + = new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_STARTED, time.milliseconds(), 1); ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata); - RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion); - - Assertions.assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord); + RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = + transform.fromApiMessageAndVersion(apiMessageAndVersion); + assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord); } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java index 8e3985d0d5fb5..34f1fb083667e 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java @@ -20,10 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.AbstractMap; import java.util.HashMap; @@ -38,10 +35,10 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; -public class TopicBasedRemoteLogMetadataManagerConfigTest { - private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class); +import static org.junit.jupiter.api.Assertions.assertEquals; - private static final String BOOTSTRAP_SERVERS = "localhost:9091"; +public class TopicBasedRemoteLogMetadataManagerConfigTest { + private static final String BOOTSTRAP_SERVERS = "localhost:2222"; @Test public void testValidConfig() { @@ -60,41 +57,32 @@ public void testValidConfig() { // Check for topic properties TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); - Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); + assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), rlmmConfig.metadataTopicPartitionsCount()); // Check for common client configs. - Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - Assertions.assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(BOOTSTRAP_SERVERS, rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); for (Map.Entry entry : commonClientConfig.entrySet()) { - log.info("Checking config: " + entry.getKey()); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.commonProperties().get(entry.getKey())); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.producerProperties().get(entry.getKey())); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.consumerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.commonProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.consumerProperties().get(entry.getKey())); } - // Check for producer configs. for (Map.Entry entry : producerConfig.entrySet()) { - log.info("Checking config: " + entry.getKey()); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.producerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.producerProperties().get(entry.getKey())); } - // Check for consumer configs. for (Map.Entry entry : consumerConfig.entrySet()) { - log.info("Checking config: " + entry.getKey()); - Assertions.assertEquals(entry.getValue(), - rlmmConfig.consumerProperties().get(entry.getKey())); + assertEquals(entry.getValue(), rlmmConfig.consumerProperties().get(entry.getKey())); } } @Test public void testCommonProducerConsumerOverridesConfig() { - Map.Entry overrideEntry = new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L); + Map.Entry overrideEntry = + new AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 60000L); Map commonClientConfig = new HashMap<>(); commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10); commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 1000L); @@ -114,12 +102,9 @@ public void testCommonProducerConsumerOverridesConfig() { Map props = createValidConfigProps(commonClientConfig, producerConfig, consumerConfig); TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props); - Assertions.assertEquals(overrideCommonPropValue, - rlmmConfig.commonProperties().get(overrideEntry.getKey())); - Assertions.assertEquals(overriddenProducerPropValue, - rlmmConfig.producerProperties().get(overrideEntry.getKey())); - Assertions.assertEquals(overriddenConsumerPropValue, - rlmmConfig.consumerProperties().get(overrideEntry.getKey())); + assertEquals(overrideCommonPropValue, rlmmConfig.commonProperties().get(overrideEntry.getKey())); + assertEquals(overriddenProducerPropValue, rlmmConfig.producerProperties().get(overrideEntry.getKey())); + assertEquals(overriddenConsumerPropValue, rlmmConfig.consumerProperties().get(overrideEntry.getKey())); } private Map createValidConfigProps(Map commonClientConfig, @@ -129,7 +114,6 @@ private Map createValidConfigProps(Map commonCli props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(BROKER_ID, 1); props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath()); - props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3); props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10); props.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 60 * 60 * 1000L); @@ -138,17 +122,14 @@ private Map createValidConfigProps(Map commonCli for (Map.Entry entry : commonClientConfig.entrySet()) { props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + entry.getKey(), entry.getValue()); } - // producer configs for (Map.Entry entry : producerConfig.entrySet()) { props.put(REMOTE_LOG_METADATA_PRODUCER_PREFIX + entry.getKey(), entry.getValue()); } - //consumer configs for (Map.Entry entry : consumerConfig.entrySet()) { props.put(REMOTE_LOG_METADATA_CONSUMER_PREFIX + entry.getKey(), entry.getValue()); } - return props; } } \ No newline at end of file diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java index 916b475c143b9..d02da09804048 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java @@ -27,12 +27,11 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; @@ -44,6 +43,10 @@ import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -55,10 +58,7 @@ @ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { private final ClusterInstance clusterInstance; - - private static final int SEG_SIZE = 1024 * 1024; - - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; @@ -125,24 +125,25 @@ public int metadataPartition(TopicIdPartition topicIdPartition) { // Add segments for these partitions but an exception is received as they have not yet been subscribed. // These messages would have been published to the respective metadata topic partitions but the ConsumerManager // has not yet been subscribing as they are not yet registered. + int segSize = 1048576; RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); - Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + ExecutionException exception = assertThrows(ExecutionException.class, + () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", exception.getMessage()); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - exception = Assertions.assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); - Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); + exception = assertThrows(ExecutionException.class, () -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); + assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []", exception.getMessage()); // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. - Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)); - Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); - + assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)); + assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.emptySet()); @@ -156,8 +157,8 @@ public int metadataPartition(TopicIdPartition topicIdPartition) { // leader partitions would have received as it is registered, but follower partition is not yet registered, // hence it throws an exception. - Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext()); - Assertions.assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); + assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext()); + assertThrows(RemoteStorageException.class, () -> remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)); // Register follower partition // Phaser::bulkRegister and Phaser::register provide the "countUp" feature @@ -172,15 +173,15 @@ public int metadataPartition(TopicIdPartition topicIdPartition) { verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition); verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata); // In this state, all the metadata should be available in RLMM for both leader and follower partitions. - Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); - Assertions.assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); + assertTrue(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); + assertTrue(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); } } private void createTopic(String topic, Map> replicasAssignments) { try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { admin.createTopics(Collections.singletonList(new NewTopic(topic, replicasAssignments))); - Assertions.assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size())); + assertDoesNotThrow(() -> clusterInstance.waitForTopic(topic, replicasAssignments.size())); } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index dee686dc036d1..a08f12d9ae22b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -24,12 +24,11 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; @@ -37,14 +36,13 @@ import java.util.Collections; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class TopicBasedRemoteLogMetadataManagerRestartTest { - private static final int SEG_SIZE = 1024 * 1024; - - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath(); private final ClusterInstance clusterInstance; @@ -76,16 +74,17 @@ public void testRLMMAPIsAfterRestart() throws Exception { clusterInstance.waitForTopic(leaderTopic, 1); clusterInstance.waitForTopic(followerTopic, 1); - final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); - final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); + TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + int segSize = 1048576; RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + time.milliseconds(), segSize, Collections.singletonMap(0, 0L)); try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) { // Register these partitions to RemoteLogMetadataManager. @@ -115,12 +114,14 @@ public void testRLMMAPIsAfterRestart() throws Exception { RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 101, 200, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L)); + time.milliseconds(), segSize, Collections.singletonMap(0, 101L)); topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get(); // Check that both the stored segment and recently added segment are available. - Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), - topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))); + assertTrue(TestUtils.sameElementsWithoutOrder( + Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), + topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) + ); } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 183a84934127d..8b9cfd0700c3a 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -25,14 +25,13 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtendWith; import java.io.IOException; @@ -42,6 +41,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -50,11 +53,10 @@ @ExtendWith(ClusterTestExtensions.class) @ClusterTestDefaults(brokers = 3) public class TopicBasedRemoteLogMetadataManagerTest { - private static final int SEG_SIZE = 1024 * 1024; - + private static final int SEG_SIZE = 1048576; private final ClusterInstance clusterInstance; private final RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); private TopicBasedRemoteLogMetadataManager remoteLogMetadataManager; TopicBasedRemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { @@ -83,7 +85,7 @@ public void testDoesTopicExist() throws ExecutionException, InterruptedException admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get(); clusterInstance.waitForTopic(topic, 1); boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); - Assertions.assertTrue(doesTopicExist); + assertTrue(doesTopicExist); } } @@ -92,7 +94,7 @@ public void testTopicDoesNotExist() { try (Admin admin = clusterInstance.createAdminClient()) { String topic = "dummy-test-topic"; boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); - Assertions.assertFalse(doesTopicExist); + assertFalse(doesTopicExist); } } @@ -139,37 +141,37 @@ public void testNewPartitionUpdates() throws Exception { RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newLeaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(newFollowerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - Assertions.assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); + assertThrows(Exception.class, () -> topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. - Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition)); - Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition)); + assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition)); + assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition)); topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition), Collections.singleton(newFollowerTopicIdPartition)); // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition); verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata); - Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); - Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); + assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); + assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); } @ClusterTest public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); - Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0)); + assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0)); } @ClusterTest @@ -206,8 +208,8 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); @@ -215,7 +217,7 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0); - Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize); + assertEquals(SEG_SIZE * 6, remoteLogSize); } @ClusterTest @@ -251,16 +253,16 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); - Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); - Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); - Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); + assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); + assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); + assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); } @ClusterTest @@ -293,13 +295,13 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); - Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001)); + assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001)); } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java index b67c316e5e499..d970dd8b68ccc 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java @@ -23,33 +23,39 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils; import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; -//import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_FINISHED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED; +import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED; + @Tag("integration") @ExtendWith(ClusterTestExtensions.class) @ClusterTestDefaults(brokers = 3) public class RemoteLogMetadataManagerTest { private final ClusterInstance clusterInstance; - private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); - private static final int SEG_SIZE = 1024 * 1024; + private static final int SEG_SIZE = 1048576; private static final int BROKER_ID_0 = 0; private static final int BROKER_ID_1 = 1; - - private final Time time = new MockTime(1); + private final Time time = new SystemTime(); RemoteLogMetadataManagerTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; @@ -71,26 +77,25 @@ public void testFetchSegments() throws Exception { // 1.Create a segment with state COPY_SEGMENT_STARTED, and this segment should not be available. Map segmentLeaderEpochs = Collections.singletonMap(0, 101L); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); - RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 101L, 200L, -1L, BROKER_ID_0, - time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( + segmentId, 101L, 200L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); // Wait until the segment is added successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); // Search should not return the above segment. - Assertions.assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); + assertFalse(remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150).isPresent()); // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available. - RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), - Optional.empty(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, - BROKER_ID_1); + RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( + segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, BROKER_ID_1); // Wait until the segment is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); RemoteLogSegmentMetadata expectedSegmentMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); // Search should return the above segment. - Optional segmentMetadataForOffset150 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150); - Assertions.assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); + Optional segmentMetadataForOffset150 = + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 0, 150); + assertEquals(Optional.of(expectedSegmentMetadata), segmentMetadataForOffset150); } } @@ -111,46 +116,46 @@ public void testRemotePartitionDeletion() throws Exception { segmentLeaderEpochs.put(2, 50L); segmentLeaderEpochs.put(3, 80L); RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid()); - RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 100L, - -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, - segmentLeaderEpochs); + RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata( + segmentId, 0L, 100L, -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs); // Wait until the segment is added successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get()); RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate( - segmentId, time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1); + segmentId, time.milliseconds(), Optional.empty(), COPY_SEGMENT_FINISHED, BROKER_ID_1); // Wait until the segment is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get()); RemoteLogSegmentMetadata expectedSegMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate); // Check that the segment exists in RLMM. - Optional segMetadataForOffset30Epoch1 = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); - Assertions.assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); + Optional segMetadataForOffset30Epoch1 = + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); + assertEquals(Optional.of(expectedSegMetadata), segMetadataForOffset30Epoch1); // Mark the partition for deletion and wait for it to be updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( - createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_MARKED)).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( + createRemotePartitionDeleteMetadata(DELETE_PARTITION_MARKED)).get()); - Optional segmentMetadataAfterDelMark = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, - 1, 30L); - Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark); + Optional segmentMetadataAfterDelMark = + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); + assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelMark); // Set the partition deletion state as started. Partition and segments should still be accessible as they are not // yet deleted. Wait until the segment state is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( - createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_STARTED)).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( + createRemotePartitionDeleteMetadata(DELETE_PARTITION_STARTED)).get()); - Optional segmentMetadataAfterDelStart = remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, - 1, 30L); - Assertions.assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart); + Optional segmentMetadataAfterDelStart = + remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L); + assertEquals(Optional.of(expectedSegMetadata), segmentMetadataAfterDelStart); // Set the partition deletion state as finished. RLMM should clear all its internal state for that partition. // Wait until the segment state is updated successfully. - Assertions.assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( - createRemotePartitionDeleteMetadata(RemotePartitionDeleteState.DELETE_PARTITION_FINISHED)).get()); + assertDoesNotThrow(() -> remoteLogMetadataManager.putRemotePartitionDeleteMetadata( + createRemotePartitionDeleteMetadata(DELETE_PARTITION_FINISHED)).get()); - Assertions.assertThrows(RemoteResourceNotFoundException.class, + assertThrows(RemoteResourceNotFoundException.class, () -> remoteLogMetadataManager.remoteLogSegmentMetadata(TP0, 1, 30L)); } }