Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members #16145

Original file line number Diff line number Diff line change
Expand Up @@ -325,24 +325,12 @@ private Group validateOffsetCommit(
}
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
}

try {
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false
);
} catch (StaleMemberEpochException ex) {
// The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When
// it is, the member should be using the OffsetCommit API version >= 9. As we don't
// support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION
// error if an older version is used. We will revise this when the upgrade path is implemented.
if (context.header.apiVersion() >= 9) {
throw ex;
} else {
throw Errors.UNSUPPORTED_VERSION.exception();
}
}
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false
);

return group;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ public void validateOffsetCommit(
throw Errors.UNKNOWN_MEMBER_ID.exception();
}

// TODO: A temp marker. Will remove it when the pr is open.
if (!isTransactional && isInState(COMPLETING_REBALANCE)) {
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
// We should not receive a commit request if the group has not completed rebalance;
// but since the consumer's member.id and generation is valid, it means it has received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
Expand Down Expand Up @@ -806,7 +809,29 @@ public void validateOffsetCommit(
if (memberEpoch < 0 && members().isEmpty()) return;

final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
validateMemberEpoch(memberEpoch, member.memberEpoch());
if (member.useClassicProtocol()) {
validateMemberInstanceId(member, groupInstanceId);
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved

try {
validateMemberEpoch(memberEpoch, member.memberEpoch());
} catch (StaleMemberEpochException ex) {
// StaleMemberEpochException is not supported in the classic protocol. We throw
// IllegalGenerationException instead for compatibility.
throw new IllegalGenerationException(String.format("Invalid offset commit because the "
+ "received generation id %d does not match the expected member epoch %d.",
memberEpoch, member.memberEpoch()));
}

if (member.memberEpoch() < groupEpoch() ||
member.state() == MemberState.UNREVOKED_PARTITIONS ||
(member.state() == MemberState.UNRELEASED_PARTITIONS && !waitingOnUnreleasedPartition(member))) {
throw new RebalanceInProgressException(String.format("Invalid offset commit because" +
" a new rebalance has been triggered in group %s and member %s should rejoin to catch up.",
groupId(), member.memberId()));
}
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
} else {
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
}

/**
Expand Down Expand Up @@ -1421,4 +1446,30 @@ public boolean waitingOnUnreleasedPartition(ConsumerGroupMember member) {
}
return false;
}

/**
* Validates that the instance id exists and is mapped to the member id
* if the group instance id is provided.
*
* @param member The ConsumerGroupMember.
* @param instanceId The instance id.
*/
private void validateMemberInstanceId(
ConsumerGroupMember member,
String instanceId
) throws UnknownMemberIdException, FencedInstanceIdException {
if (instanceId != null) {
ConsumerGroupMember staticMember = staticMember(instanceId);
if (member == null) {
throw new UnknownMemberIdException(
String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId())
);
}

if (!staticMember.memberId().equals(member.memberId())) {
throw Errors.FENCED_INSTANCE_ID.exception("Static member " + member.memberId() + " with instance id "
+ instanceId + " was fenced by member " + staticMember.memberId() + ".");
}
}
}
}