Skip to content

Commit

Permalink
KAFKA-16786: Remove old assignment strategy usage in new consumer (ap…
Browse files Browse the repository at this point in the history
…ache#16214)

Remove usage of the partition.assignment.strategy config in the new consumer. This config is deprecated with the new consumer protocol, so the AsyncKafkaConsumer should not use or validate the property.

Reviewers: Lucas Brutschy <[email protected]>
  • Loading branch information
lianetm authored and TaiJuWu committed Jun 8, 2024
1 parent 3e8a4d9 commit fe23ed8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
Expand Down Expand Up @@ -240,7 +239,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final int defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;

// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
Expand Down Expand Up @@ -373,10 +371,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
rebalanceListenerInvoker
);
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId))
);

// The FetchCollector is only used on the application thread.
this.fetchCollector = fetchCollectorFactory.build(logContext,
Expand Down Expand Up @@ -424,7 +418,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
ConsumerMetadata metadata,
long retryBackoffMs,
int defaultApiTimeoutMs,
List<ConsumerPartitionAssignor> assignors,
String groupId,
boolean autoCommitEnabled) {
this.log = logContext.logger(getClass());
Expand All @@ -445,7 +438,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
Expand All @@ -460,8 +452,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
Deserializer<V> valueDeserializer,
KafkaClient client,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
List<ConsumerPartitionAssignor> assignors) {
ConsumerMetadata metadata) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
Expand All @@ -475,7 +466,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
this.assignors = assignors;
this.clientTelemetryReporter = Optional.empty();

ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
Expand Down Expand Up @@ -1687,12 +1677,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
}
}

private void throwIfNoAssignorsConfigured() {
if (assignors.isEmpty())
throw new IllegalStateException("Must configure at least one partition assigner class name to " +
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}

private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
Expand Down Expand Up @@ -1780,7 +1764,6 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
"null" : "empty"));
throwIfNoAssignorsConfigured();
log.info("Subscribed to pattern: '{}'", pattern);
subscriptions.subscribe(pattern, listener);
metadata.requestUpdateForNewTopics();
Expand All @@ -1805,8 +1788,6 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}

throwIfNoAssignorsConfigured();

// Clear the buffered data which are not a part of newly assigned topics
final Set<TopicPartition> currentTopicPartitions = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public <K, V> ConsumerDelegate<K, V> create(LogContext logContext,
valueDeserializer,
client,
subscriptions,
metadata,
assignors
metadata
);
else
return new LegacyKafkaConsumer<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
Expand Down Expand Up @@ -3227,7 +3227,7 @@ public void testUnusedConfigs(GroupProtocol groupProtocol) {
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testAssignorNameConflict(GroupProtocol groupProtocol) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -30,7 +29,6 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
Expand Down Expand Up @@ -205,7 +203,6 @@ private AsyncKafkaConsumer<String, String> newConsumer(
ConsumerInterceptors<String, String> interceptors,
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
SubscriptionState subscriptions,
List<ConsumerPartitionAssignor> assignors,
String groupId,
String clientId) {
long retryBackoffMs = 100L;
Expand All @@ -228,7 +225,6 @@ private AsyncKafkaConsumer<String, String> newConsumer(
metadata,
retryBackoffMs,
defaultApiTimeoutMs,
assignors,
groupId,
autoCommitEnabled);
}
Expand Down Expand Up @@ -564,7 +560,6 @@ public void testCommitAsyncLeaderEpochUpdate() {
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
completeCommitSyncApplicationEventSuccessfully();
Expand Down Expand Up @@ -784,7 +779,6 @@ public void testPartitionRevocationOnClose() {
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");

Expand All @@ -806,7 +800,6 @@ public void testFailedPartitionRevocationOnClose() {
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
subscriptions.subscribe(singleton("topic"), Optional.of(listener));
Expand Down Expand Up @@ -844,7 +837,6 @@ public void testAutoCommitSyncEnabled() {
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
Expand All @@ -862,7 +854,6 @@ public void testAutoCommitSyncDisabled() {
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
Expand Down Expand Up @@ -1624,6 +1615,18 @@ public void testGroupRemoteAssignorUsedInConsumerProtocol() {
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}

@Test
public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor");
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);

assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
}

@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerConfig();
Expand Down Expand Up @@ -1666,7 +1669,6 @@ public void testEnsurePollEventSentOnConsumerPoll() {
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
singletonList(new RoundRobinAssignor()),
"group-id",
"client-id");
final TopicPartition tp = new TopicPartition("topic", 0);
Expand Down

0 comments on commit fe23ed8

Please sign in to comment.