Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schema] Fix autoConsumeSchema deadLetter problem. #9970

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.schema.GenericRecord;
import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -183,6 +186,93 @@ public void testDeadLetterTopicHasOriginalInfo() throws Exception {
consumer.close();
}

@Data
public static class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
}

@Data
public static class FooV2 {
@Nullable
private String field1;
@Nullable
private String field2;
@Nullable
private String field3;
}

@Test(timeOut = 20000)
public void testAutoConsumeSchemaDeadLetter() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subName = "my-subscription";
final int maxRedeliveryCount = 1;
final int sendMessages = 10;

admin.topics().createNonPartitionedTopic(topic);
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<FooV2> deadLetterConsumer = newPulsarClient.newConsumer(Schema.AVRO(FooV2.class))
.topic(topic + "-" + subName + "-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(topic)
.create();
Set<String> messageIds = new HashSet<>();
for (int i = 0; i < sendMessages; i++) {
if (i % 2 == 0) {
Foo foo = new Foo();
foo.field1 = i + "";
foo.field2 = i + "";
messageIds.add(producer.newMessage(Schema.AVRO(Foo.class)).value(foo).send().toString());
} else {
FooV2 foo = new FooV2();
foo.field1 = i + "";
foo.field2 = i + "";
foo.field3 = i + "";
messageIds.add(producer.newMessage(Schema.AVRO(FooV2.class)).value(foo).send().toString());
}
}
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
producer.close();

int totalReceived = 0;
do {
consumer.receive();
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;

for (int i = 0; i < sendMessages; i++) {
Message<FooV2> message;
message = deadLetterConsumer.receive();
FooV2 fooV2 = message.getValue();
assertNotNull(fooV2.field1);
assertEquals(fooV2.field2, fooV2.field1);
assertTrue(fooV2.field3 == null || fooV2.field1.equals(fooV2.field3));
assertEquals(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
assertTrue(messageIds.contains(message.getProperties().get(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
}
assertEquals(totalInDeadLetter, sendMessages);
deadLetterConsumer.close();
consumer.close();
newPulsarClient.close();
}

@Test(timeOut = 30000)
public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final DeadLetterPolicy deadLetterPolicy;

private volatile CompletableFuture<Producer<T>> deadLetterProducer;
private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;

private volatile Producer<T> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -590,9 +590,10 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
initDeadLetterProducerIfNeeded();
MessageId finalMessageId = messageId;
deadLetterProducer.thenAccept(dlqProducer -> {
TypedMessageBuilder<T> typedMessageBuilderNew = dlqProducer.newMessage()
.value(retryMessage.getValue())
.properties(propertiesMap);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> {
result.complete(null);
Expand Down Expand Up @@ -1691,12 +1692,12 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId)
initDeadLetterProducerIfNeeded();
List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
MessageIdImpl finalMessageId = messageId;
deadLetterProducer.thenAccept(producerDLQ -> {
deadLetterProducer.thenAcceptAsync(producerDLQ -> {
Copy link
Member

@mattisonchao mattisonchao Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@congbobo184 Why do we use a common pool to run the next logic?

for (MessageImpl<T> message : finalDeadLetterMessages) {
String originMessageIdStr = getOriginMessageIdStr(message);
String originTopicNameStr = getOriginTopicNameStr(message);
producerDLQ.newMessage()
.value(message.getValue())
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr))
.sendAsync()
.thenAccept(messageIdInDLQ -> {
Expand Down Expand Up @@ -1733,7 +1734,7 @@ private void initDeadLetterProducerIfNeeded() {
createProducerLock.writeLock().lock();
try {
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
deadLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
.createAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
Expand Down