Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members #16145

Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
topic = "foo",
partition = 0,
offset = 100L,
expectedError = Errors.NONE,
expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE,
version = version.toShort
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ public static GroupType parse(String name) {
* @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch
* for consumer groups.
* @param isTransactional Whether the offset commit is transactional or not.
* @param apiVersion The api version.
*/
void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationIdOrMemberEpoch,
boolean isTransactional
boolean isTransactional,
short apiVersion

) throws KafkaException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,34 +325,25 @@ private Group validateOffsetCommit(
}
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
}

try {
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false
);
} catch (StaleMemberEpochException ex) {
// The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When
// it is, the member should be using the OffsetCommit API version >= 9. As we don't
// support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION
// error if an older version is used. We will revise this when the upgrade path is implemented.
if (context.header.apiVersion() >= 9) {
throw ex;
} else {
throw Errors.UNSUPPORTED_VERSION.exception();
}
}
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false,
context.apiVersion()
);

return group;
}

/**
* Validates an TxnOffsetCommit request.
*
* @param context The request context.
* @param request The actual request.
*/
private Group validateTransactionalOffsetCommit(
RequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
Group group;
Expand All @@ -375,7 +366,8 @@ private Group validateTransactionalOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationId(),
true
true,
context.apiVersion()
);
} catch (StaleMemberEpochException ex) {
throw Errors.ILLEGAL_GENERATION.exception();
Expand Down Expand Up @@ -530,7 +522,7 @@ public CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> commitT
RequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
validateTransactionalOffsetCommit(request);
validateTransactionalOffsetCommit(context, request);

final TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData();
final List<CoordinatorRecord> records = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,13 +824,15 @@ public void validateMember(
* @param groupInstanceId The group instance id.
* @param generationId The generation id.
* @param isTransactional Whether the offset commit is transactional or not.
* @param apiVersion The api version.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationId,
boolean isTransactional
boolean isTransactional,
short apiVersion
) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException {
if (isInState(DEAD)) {
throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -792,21 +794,36 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or not. It has no
* impact when a consumer group is used.
* @param apiVersion The api version.
* @throws UnknownMemberIdException If the member is not found.
* @throws StaleMemberEpochException If the member uses the consumer protocol and the provided
* member epoch doesn't match the actual member epoch.
* @throws IllegalGenerationException If the member uses the classic protocol and the provided
* generation id is not equal to the member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
boolean isTransactional
) throws UnknownMemberIdException, StaleMemberEpochException {
boolean isTransactional,
short apiVersion
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;

final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
validateMemberEpoch(memberEpoch, member.memberEpoch());

// If the commit is not transactional and the member uses the new consumer protocol (KIP-848),
// the member should be using the OffsetCommit API version >= 9.
if (!isTransactional && !member.useClassicProtocol() && apiVersion < 9) {
throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " +
"by members using the consumer group protocol");
}

validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -815,13 +832,18 @@ public void validateOffsetCommit(
* @param memberId The member id for consumer groups.
* @param memberEpoch The member epoch for consumer groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
* @throws UnknownMemberIdException If the member is not found.
* @throws StaleMemberEpochException If the member uses the consumer protocol and the provided
* member epoch doesn't match the actual member epoch.
* @throws IllegalGenerationException If the member uses the classic protocol and the provided
* generation id is not equal to the member epoch.
*/
@Override
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws UnknownMemberIdException, StaleMemberEpochException {
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member id is null and the member epoch is -1, the request either comes
// from the admin client or from a client which does not provide them. In this case,
// the fetch request is accepted.
Expand All @@ -832,7 +854,7 @@ public void validateOffsetFetch(
throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
memberId, groupId));
}
validateMemberEpoch(memberEpoch, member.memberEpoch());
validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
}

/**
Expand Down Expand Up @@ -896,16 +918,27 @@ public boolean isInStates(Set<String> statesFilter, long committedOffset) {
}

/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.
* Throws an exception if the received member epoch does not match the expected member epoch.
*
* @param receivedMemberEpoch The received member epoch or generation id.
* @param expectedMemberEpoch The expected member epoch.
* @param useClassicProtocol The boolean indicating whether the checked member uses the classic protocol.
* @throws StaleMemberEpochException if the member with unmatched member epoch uses the consumer protocol.
* @throws IllegalGenerationException if the member with unmatched generation id uses the classic protocol.
*/
private void validateMemberEpoch(
int receivedMemberEpoch,
int expectedMemberEpoch
) throws StaleMemberEpochException {
int expectedMemberEpoch,
boolean useClassicProtocol
) throws StaleMemberEpochException, IllegalGenerationException {
if (receivedMemberEpoch != expectedMemberEpoch) {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
if (useClassicProtocol) {
throw new IllegalGenerationException(String.format("The received generation id %d does not match " +
"the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
} else {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
Expand All @@ -51,6 +50,7 @@
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
Expand Down Expand Up @@ -1142,14 +1142,8 @@ public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() {
assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request));
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) {
// All the newer versions are fine.
if (version >= 9) return;
// Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields.
if (version == 0) return;

@Test
public void testConsumerGroupOffsetCommitWithIllegalGenerationId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();

// Create an empty group.
Expand All @@ -1162,27 +1156,30 @@ public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short ve
group.updateMember(new ConsumerGroupMember.Builder("member")
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
.build()
);

// Verify that the request is rejected with the correct exception.
assertThrows(UnsupportedVersionException.class, () -> context.commitOffset(
version,
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(9)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
))
))
)
);
OffsetCommitRequestData request = new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(9)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
))
));

// Verify that a smaller epoch is rejected.
assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));

// Verify that a larger epoch is rejected.
request.setGenerationIdOrMemberEpoch(11);
assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));
}

@Test
Expand Down Expand Up @@ -2294,6 +2291,30 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}

@Test
public void testConsumerGroupOffsetFetchWithIllegalGenerationId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
group.updateMember(new ConsumerGroupMember.Builder("member")
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
.build()
);

List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);

// Fetch offsets case.
assertThrows(IllegalGenerationException.class,
() -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE));

// Fetch all offsets case.
assertThrows(IllegalGenerationException.class,
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}

@Test
public void testGenericGroupOffsetDelete() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
Expand Down
Loading