Skip to content

Commit

Permalink
MINOR: Rename uniform assignor's internal builders (apache#16233)
Browse files Browse the repository at this point in the history
This patch renames the uniform assignor's builders to match the `SubscriptionType` which is used to determine which one is called. It removes the abstract class `AbstractUniformAssignmentBuilder` which is not necessary anymore. It also applies minor refactoring.

Reviewers: Ritika Reddy <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
dajac authored and gongxuanzhang committed Jun 12, 2024
1 parent 0185d09 commit bd19ef2
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 92 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@
* subscriptions across the group members:
* <ul>
* <li>
* <b> Optimized Uniform Assignment Builder: </b> This strategy is used when all members have subscribed
* <b> Uniform Homogeneous Assignment Builder: </b> This strategy is used when all members have subscribed
* to the same set of topics.
* </li>
* <li>
* <b> General Uniform Assignment Builder: </b> This strategy is used when members have varied topic
* <b> Uniform Heterogeneous Assignment Builder: </b> This strategy is used when members have varied topic
* subscriptions.
* </li>
* </ul>
*
* The appropriate strategy is automatically chosen based on the current members' topic subscriptions.
*
* @see OptimizedUniformAssignmentBuilder
* @see GeneralUniformAssignmentBuilder
* @see UniformHomogeneousAssignmentBuilder
* @see UniformHeterogeneousAssignmentBuilder
*/
public class UniformAssignor implements ConsumerGroupPartitionAssignor {
private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class);
Expand Down Expand Up @@ -76,14 +76,14 @@ public GroupAssignment assign(

if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
+ "optimized assignment algorithm");
return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
+ "homogeneous assignment algorithm");
return new UniformHomogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.build();
} else {
LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
+ "general assignment algorithm");
return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.buildAssignment();
+ "heterogeneous assignment algorithm");
return new UniformHeterogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.stream.Collectors;

/**
* The general uniform assignment builder is used to generate the target assignment for a consumer group with
* The heterogeneous uniform assignment builder is used to generate the target assignment for a consumer group with
* at least one of its members subscribed to a different set of topics.
*
* Assignments are done according to the following principles:
Expand All @@ -55,8 +55,8 @@
* This assignment builder prioritizes the above properties in the following order:
* Balance > Stickiness.
*/
public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder {
private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
public class UniformHeterogeneousAssignmentBuilder {
private static final Logger LOG = LoggerFactory.getLogger(UniformHeterogeneousAssignmentBuilder.class);

/**
* The group metadata specification.
Expand Down Expand Up @@ -113,7 +113,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
*/
private final PartitionMovements partitionMovements;

public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
Expand All @@ -133,7 +133,7 @@ public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescr
targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>()));
})
);
this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber));
this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
this.assignedStickyPartitions = new HashSet<>();
this.assignmentManager = new AssignmentManager(this.subscribedTopicDescriber);
this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds());
Expand All @@ -148,8 +148,7 @@ public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescr
* <li> Allocate all the remaining unassigned partitions to the members in a balanced manner.</li>
* <li> Iterate through the assignment until it is balanced. </li>
*/
@Override
protected GroupAssignment buildAssignment() {
public GroupAssignment build() {
if (subscribedTopicIds.isEmpty()) {
LOG.info("The subscription list is empty, returning an empty assignment");
return new GroupAssignment(Collections.emptyMap());
Expand Down Expand Up @@ -462,6 +461,43 @@ private void processPartitionMovement(TopicIdPartition topicIdPartition, String
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember);
}

/**
* Adds the topic's partition to the member's target assignment.
*/
private static void addPartitionToAssignment(
Map<String, MemberAssignment> memberAssignments,
String memberId,
Uuid topicId,
int partition
) {
memberAssignments.get(memberId)
.partitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
}

/**
* Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts.
*
* @param topicIds Collection of topic Ids.
* @param subscribedTopicDescriber Describer to fetch partition counts for topics.
*
* @return Set of {@code TopicIdPartition} including all the provided topic Ids.
*/
private static Set<TopicIdPartition> topicIdPartitions(
Collection<Uuid> topicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Set<TopicIdPartition> topicIdPartitions = new HashSet<>();
for (Uuid topicId : topicIds) {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
topicIdPartitions.add(new TopicIdPartition(topicId, partitionId));
}
}
return topicIdPartitions;
}

/**
* This class represents a pair of member Ids involved in a partition reassignment.
* Each pair contains a source and a destination member Id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.Set;

/**
* The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with
* The homogeneous uniform assignment builder is used to generate the target assignment for a consumer group with
* all its members subscribed to the same set of topics.
*
* Assignments are done according to the following principles:
Expand All @@ -48,7 +48,7 @@
* The assignment builder prioritizes the properties in the following order:
* Balance > Stickiness.
*/
public class OptimizedUniformAssignmentBuilder {
public class UniformHomogeneousAssignmentBuilder {
private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass();
private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass();

Expand Down Expand Up @@ -104,7 +104,7 @@ private static boolean isImmutableMap(Map<?, ?> map) {
*/
private int remainingMembersToGetAnExtraPartition;

OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
UniformHomogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class GeneralUniformAssignmentBuilderTest {
public class UniformHeterogeneousAssignmentBuilderTest {
private final UniformAssignor assignor = new UniformAssignor();
private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw");
private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ConsumerPartitionAssignor assignor() {
/**
* The subscription pattern followed by the members of the group.
*
* A subscription model is considered homogenous if all the members of the group
* A subscription model is considered homogeneous if all the members of the group
* are subscribed to the same set of topics, it is heterogeneous otherwise.
*/
public enum SubscriptionModel {
Expand Down

0 comments on commit bd19ef2

Please sign in to comment.