Skip to content

Commit

Permalink
MINOR: Cleanup the storage module unit tests (apache#16202)
Browse files Browse the repository at this point in the history
- Use SystemTime instead of MockTime when time is not mocked
- Use static assertions to reduce the line length
- Fold the lines if it exceeds the limit
- rename tp0 to tpId0 when it refers to TopicIdPartition

Reviewers: Kuan-Po (Cooper) Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
kamalcph authored and gongxuanzhang committed Jun 12, 2024
1 parent c1cfaee commit 7c5e024
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
Expand All @@ -35,6 +34,7 @@
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;

public class RemoteLogMetadataFormatterTest {
private static final Uuid TOPIC_ID = Uuid.randomUuid();
Expand All @@ -51,12 +51,12 @@ public void testFormat() throws IOException {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID);
Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1,
123L, 1024, customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED,
segLeaderEpochs);

byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
"__remote_log_metadata", 0, 0, null, metadataBytes);

String expected = String.format(
"partition: 0, offset: 0, value: " +
Expand All @@ -68,9 +68,11 @@ public void testFormat() throws IOException {
TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) {
RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new RemoteLogMetadataSerde.RemoteLogMetadataFormatter();
formatter.writeTo(metadataRecord, ps);
assertEquals(expected, baos.toString());
try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter =
new RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) {
formatter.writeTo(metadataRecord, ps);
assertEquals(expected, baos.toString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,48 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_MARKED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RemoteLogMetadataSerdeTest {

public static final String TOPIC = "foo";
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(TOPIC, 0));
private final Time time = new MockTime(1);
private final Time time = new SystemTime();

@Test
public void testRemoteLogSegmentMetadataSerde() {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata();

doTestRemoteLogMetadataSerde(remoteLogSegmentMetadata);
}

@Test
public void testRemoteLogSegmentMetadataUpdateSerde() {
RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = createRemoteLogSegmentMetadataUpdate();

doTestRemoteLogMetadataSerde(remoteLogSegmentMetadataUpdate);
}

@Test
public void testRemotePartitionDeleteMetadataSerde() {
RemotePartitionDeleteMetadata remotePartitionDeleteMetadata = createRemotePartitionDeleteMetadata();

doTestRemoteLogMetadataSerde(remotePartitionDeleteMetadata);
}

Expand All @@ -70,24 +70,19 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
segLeaderEpochs.put(1, 20L);
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024,
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
segLeaderEpochs
);
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
COPY_SEGMENT_STARTED, segLeaderEpochs);
}

private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 2);
}

private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata() {
return new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_MARKED,
time.milliseconds(), 0);
return new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_MARKED, time.milliseconds(), 0);
}

private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) {
Expand All @@ -96,16 +91,17 @@ private void doTestRemoteLogMetadataSerde(RemoteLogMetadata remoteLogMetadata) {
byte[] metadataBytes = serializer.serialize(remoteLogMetadata);

// Deserialize the bytes and check the RemoteLogMetadata object is as expected.
// Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances.
// Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and
// deserializer having their own instances.
RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde();
RemoteLogMetadata deserializedRemoteLogMetadata = deserializer.deserialize(metadataBytes);
Assertions.assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
assertEquals(remoteLogMetadata, deserializedRemoteLogMetadata);
}

@Test
public void testInvalidRemoteStorageMetadata() {
// Serializing receives an exception as it does not have the expected RemoteLogMetadata registered in serdes.
Assertions.assertThrows(IllegalArgumentException.class,
assertThrows(IllegalArgumentException.class,
() -> new RemoteLogMetadataSerde().serialize(new InvalidRemoteLogMetadata(1, time.milliseconds())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform;
Expand All @@ -29,60 +29,56 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.Optional;

import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_FINISHED;
import static org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState.DELETE_PARTITION_STARTED;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
private final Time time = new MockTime(1);
private final Time time = new SystemTime();

@Test
public void testRemoteLogSegmentMetadataTransform() {
RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform();

RemoteLogSegmentMetadata metadata = createRemoteLogSegmentMetadata();
ApiMessageAndVersion apiMessageAndVersion = metadataTransform.toApiMessageAndVersion(metadata);
RemoteLogSegmentMetadata remoteLogSegmentMetadataFromRecord = metadataTransform
.fromApiMessageAndVersion(apiMessageAndVersion);

Assertions.assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
assertEquals(metadata, remoteLogSegmentMetadataFromRecord);
}

@Test
public void testRemoteLogSegmentMetadataUpdateTransform() {
RemoteLogSegmentMetadataUpdateTransform metadataUpdateTransform = new RemoteLogSegmentMetadataUpdateTransform();

RemoteLogSegmentMetadataUpdate metadataUpdate =
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate(
new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})), COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);

Assertions.assertEquals(metadataUpdate, metadataUpdateFromRecord);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord =
metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
assertEquals(metadataUpdate, metadataUpdateFromRecord);
}

private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
time.milliseconds(), 1024, Collections.singletonMap(0, 0L));
}

@Test
public void testRemoteLogPartitionMetadataTransform() {
RemotePartitionDeleteMetadataTransform transform = new RemotePartitionDeleteMetadataTransform();

RemotePartitionDeleteMetadata partitionDeleteMetadata
= new RemotePartitionDeleteMetadata(TP0, RemotePartitionDeleteState.DELETE_PARTITION_STARTED, time.milliseconds(), 1);
= new RemotePartitionDeleteMetadata(TP0, DELETE_PARTITION_STARTED, time.milliseconds(), 1);
ApiMessageAndVersion apiMessageAndVersion = transform.toApiMessageAndVersion(partitionDeleteMetadata);
RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord = transform.fromApiMessageAndVersion(apiMessageAndVersion);

Assertions.assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord);
RemotePartitionDeleteMetadata partitionDeleteMetadataFromRecord =
transform.fromApiMessageAndVersion(apiMessageAndVersion);
assertEquals(partitionDeleteMetadata, partitionDeleteMetadataFromRecord);
}
}
Loading

0 comments on commit 7c5e024

Please sign in to comment.