Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compacted topic ... how to get the most recent message? #380

Open
HeikoMueller opened this issue Apr 30, 2024 · 2 comments
Open

Compacted topic ... how to get the most recent message? #380

HeikoMueller opened this issue Apr 30, 2024 · 2 comments

Comments

@HeikoMueller
Copy link

I am struggeling with reading from a compacted topic. How would I set up a consumer or a reader that receives the most recent message (published before starting up the client) on a compacted topic? I am only able to receive either all messages or none. I tried settting up the consumer with these options:

{
topic: "persistent://public/default/topic",
subscription: 'sub-1',
subscriptionType: 'Exclusive',
subscriptionInitialPosition: 'Earliest',
readCompacted: true,
}
{
topic: "persistent://public/default/topic",
subscription: 'sub-1',
subscriptionType: 'Exclusive',
subscriptionInitialPosition: 'Latest',
readCompacted: true,
}

...and for the reader:
{
topic: 'persistent://public/default/topic',
startMessageId: Pulsar.MessageId.latest(),
readCompacted: true
}
{
topic: 'persistent://public/default/topic',
startMessageId: Pulsar.MessageId.eariest(),
readCompacted: true
}

Am I missing something here?

@merlimat
Copy link
Contributor

Compaction runs in background on a schedule, based on size and time. Perhaps the data was not compacted yet?

@HeikoMueller
Copy link
Author

HeikoMueller commented Apr 30, 2024

Actually I think everything shall be in place. What I did:

I published some messages to the topic like this:
const message = { key: "key-1", data: Buffer.from("hello world x") }
producer.send(message)

Then I manually triggered compaction with
pulsar-admin topics compact persistent://public/default/topic

Also I reduced the compaction threshold:
pulsar-admin topics set-compaction-threshold persistent://public/default/topic --threshold 16

I can see that the topic gets cleared every x messages. At this point, I do not get any initial value back. Then as the message count increases, I see all the new messages come in at re-initalization until the topic reaches the threshold and is cleared again due to compaction process, which again returns null at the initialization of the reader or subscription.

Can you tell if I use the right setup and message format here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants