Skip to content

Commit

Permalink
Use oneliner to access the Kafka message keys
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 16, 2021
1 parent 6901e62 commit 01ff757
Showing 1 changed file with 4 additions and 8 deletions.
12 changes: 4 additions & 8 deletions e2e/client_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,11 @@ func (c *clientHooks) OnBrokerWrite(meta kgo.BrokerMetadata, key int16, bytesWri
// OnRead is called after a read from a broker.
// OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
func (c *clientHooks) OnBrokerRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
offsetCommitRes := kmsg.NewOffsetCommitResponse()
joinGroupRes := kmsg.NewJoinGroupResponse()
heartbeatRes := kmsg.NewHeartbeatResponse()
syncGroupRes := kmsg.NewSyncGroupResponse()
consumerGroupMsgKeys := []int16{
offsetCommitRes.Key(),
joinGroupRes.Key(),
heartbeatRes.Key(),
syncGroupRes.Key(),
(&kmsg.OffsetCommitResponse{}).Key(),
(&kmsg.JoinGroupResponse{}).Key(),
(&kmsg.HeartbeatResponse{}).Key(),
(&kmsg.SyncGroupResponse{}).Key(),
}

isMessageFromGroupCoordinator := isInArray(key, consumerGroupMsgKeys)
Expand Down

0 comments on commit 01ff757

Please sign in to comment.