Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LI-HOTFIX] Avoid assigning replicas to the preferred controllers or maintenance brokers #111

Open
wants to merge 6 commits into
base: 2.4-li
Choose a base branch
from
Open
38 changes: 25 additions & 13 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,11 @@ class KafkaController(val config: KafkaConfig,
// between the moment this broker started and right now when it becomes controller again.
loadMinIsrForTopics(controllerContext.allTopics)

rearrangePartitionReplicaAssignmentForNewTopics(controllerContext.allTopics.toSet)
// scan partitions of all topics and ensure they don't lie on partitionUnassignableBrokerIds
// the controllerContext.partitionAssignments is still not initialized yet
// thus every single partition will be checked inside rearrangePartitionReplicaAssignmentForNewPartitions
rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet)

registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach {
case (topicPartition, replicaAssignment) =>
Expand Down Expand Up @@ -968,25 +972,28 @@ class KafkaController(val config: KafkaConfig,

// Rearrange partition and replica assignment for new topics that get assigned to
// maintenance brokers that do not take new partitions
private def rearrangePartitionReplicaAssignmentForNewTopics(topics: Set[String]) {
private def rearrangePartitionReplicaAssignmentForNewPartitions(topicsToCheck: Set[String]) {
try {
val noNewPartitionBrokers = partitionUnassignableBrokerIds
if (noNewPartitionBrokers.nonEmpty) {
val newTopics = zkClient.getPartitionNodeNonExistsTopics(topics.toSet)
val newTopicsToBeArranged = zkClient.getPartitionAssignmentForTopics(newTopics).filter {
case (_, partitionMap) =>
partitionMap.exists {
val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter {
case (topic, partitionMap) =>
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
val newPartitions = partitionMap.filter{case (partitionId, _) => partitionId >= existingAssignment.size}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add some safe check here to ensure the partition state znode doesn't exist for these new partitions. Although I don't see an issue in the current implementation, if this function is used incorrectly, it would be very dangerous. For example, if we use "rearrangePartitionReplicaAssignmentForNewPartitions(topics, false)" when initializing controller context, this may cause all topics to get reassigned since controllerContext.partitionAssignments is an empty set (this will also result in orphan partitions).
Again, I don't see the issue in this implementation, but I think it is worth checking.

In addition, this doesn't address the case when there is controller move right after partition expansion (before the rearrange actually happened). I think it is ok since we are not trying to handle 100% no replicas in the preferred controller. Maybe we can add some comments for this unhandled situations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. It's true that they are all newPartitions given controllerContext.partitionAssignments is an empty set, but they shouldn't have a replica on the noNewPartitionBrokers, thus they shouldn't be rearranged.

It's a good point that the current implementation cannot handle controller switches. Given it's safe to scan all topics' partitions on controller switch, I think it should be done during a controller switch. Thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manual assignment for existing topics can still assign partitions to noNewPartitionBrokers. I think we cannot guarantee that noNewPartitionBrokers won't get replicas. In addition, there are some small-time window that new replicas can still get assigned to preferred controllers due to the fact that preferred controller znode is emphermal znode (see the design doc for more detail).

I think a safer way is to rely on the existence of partition state znode when performing rearrangement.

scan all topics' partitions on controller switch => If we cannot guarantee 100% no replica in the preferred controllers, I think it is ok to not performing special handing during controller switch given the overhead/additional hacky code needed (up to you to make a decision).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon closer look, I find that there is already logic to handle the controller switch over by calling the rearrangePartitionReplicaAssignmentForNewPartitions method inside initializeControllerContext.

Regarding the preferred controller znodes being ephemeral, it's kinda an orthogonal design issue that we could address independently.

newPartitions.exists {
case (_, assignedReplicas) =>
assignedReplicas.replicas.intersect(noNewPartitionBrokers).nonEmpty
}
}
newTopicsToBeArranged.foreach {
topicsToBeRearranged.foreach {
case (topic, partitionMap) =>
val numPartitions = partitionMap.size
val numReplica = partitionMap.head._2.replicas.size
val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq

val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, numPartitions, numReplica)
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
val partitionsToAdd = numPartitions - existingAssignment.size
val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, partitionsToAdd, numReplica, -1, existingAssignment.size)
adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true)
info(s"Rearrange partition and replica assignment for topic [$topic]")
}
Expand Down Expand Up @@ -1697,7 +1704,7 @@ class KafkaController(val config: KafkaConfig,
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
rearrangePartitionReplicaAssignmentForNewTopics(newTopics)
rearrangePartitionReplicaAssignmentForNewPartitions(newTopics)

registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics)
Expand Down Expand Up @@ -1755,6 +1762,7 @@ class KafkaController(val config: KafkaConfig,
}

if (!isActive) return
rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it conflict with partition reassignment?
say if a partition gets reassigned to replica ( 1, 2, 5, 6 ) ==> if one of this replica is maintenance brokers, would this reassignment complete if we automatically changing zk node to disallow placing replicas on maintenance brokers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I've updated the PR to avoid assignment replicas to the undesirable hosts during partition reassignment. Please take another look. Thanks!

val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
Expand Down Expand Up @@ -1872,9 +1880,9 @@ class KafkaController(val config: KafkaConfig,
} else {
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

val noNewPartitionBrokers = partitionUnassignableBrokerIds.toSet
reassignments.foreach { case (tp, targetReplicas) =>
if (replicasAreValid(tp, targetReplicas)) {
if (replicasAreValid(tp, targetReplicas, noNewPartitionBrokers)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still have some race condition here, because a broker can be marked as maintenance brokers after partition reassignment request is received before the reassignment request is completed

maybeBuildReassignment(tp, targetReplicas) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
Expand All @@ -1893,15 +1901,19 @@ class KafkaController(val config: KafkaConfig,
}
}

private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = {
private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]],
noNewPartitionBrokers: Set[Int]): Boolean = {
replicasOpt match {
case Some(replicas) =>
val replicaSet = replicas.toSet
if (replicas.isEmpty || replicas.size != replicaSet.size)
false
else if (replicas.exists(_ < 0))
false
else {
else if (!replicaSet.intersect(noNewPartitionBrokers).isEmpty) {
warn(s"reject reassignment of $topicPartition to unassignable hosts $noNewPartitionBrokers")
false
} else {
// Ensure that any new replicas are among the live brokers
val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
val newAssignment = currentAssignment.reassignTo(replicas)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = {
val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
val noNewPartitionBrokerIds = getMaintenanceBrokerList()
val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList
val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor)
createTopicWithAssignment(topic, topicConfig, replicaAssignment)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
numPartitions: Int = 1,
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
val noNewPartitionBrokerIds = getMaintenanceBrokerList()
val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList
addPartitions(topic, existingAssignment, allBrokers, numPartitions, replicaAssignment, validateOnly, noNewPartitionBrokerIds.toSet)
}

Expand Down
71 changes: 69 additions & 2 deletions core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
package kafka.server

import java.util.{Optional, Properties}

import kafka.server.KafkaConfig.fromProps
import kafka.utils.CoreUtils._
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol

import scala.collection.JavaConverters._
import org.junit.Assert._
import org.junit.{After, Test}

import scala.collection.Map
import scala.collection.{Map, Seq}
import scala.concurrent.ExecutionException

/**
* This is the main test which ensure maintenance broker work correctly.
Expand Down Expand Up @@ -172,6 +174,71 @@ class MaintenanceBrokerTest extends ZooKeeperTestHarness {
client.close()
}

@Test
def testAddPartitionByAdminZkClientShouldHonorMaintenanceBrokers(): Unit = {
brokers = (0 to 2).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }

TestUtils.waitUntilControllerElected(zkClient)
// setting broker 1 to not take new topic partitions
setMaintenanceBrokers(Seq(1))

// create topic using admin client
val topic = "topic1"
TestUtils.createTopic(zkClient, topic, 3, 2, brokers)

assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1)))

val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map {
case (topicPartition, assignment) => topicPartition.partition -> assignment
}
val allBrokers = adminZkClient.getBrokerMetadatas()
val newPartitionsCount = 5
adminZkClient.addPartitions(topic, existingAssignment, allBrokers, 5)
(0 until newPartitionsCount).map { i =>
TestUtils.waitUntilMetadataIsPropagated(brokers, topic, i)
i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
}

assertTrue("topic1 should not be in broker 1 after increasing partition count",
ensureTopicNotInBrokers("topic1", Set(1)))
}

@Test
def testPartitionReassignmentShouldHonorMaintenanceBrokers(): Unit = {
brokers = (0 to 2).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }

TestUtils.waitUntilControllerElected(zkClient)
// setting broker 1 to not take new topic partitions
setMaintenanceBrokers(Seq(1))

// create topic using admin client
val topic = "topic1"
TestUtils.createTopic(zkClient, topic, 1, 2, brokers)
assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1)))

// get the admin client
val adminClientConfig = new Properties
val brokerList = TestUtils.bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val client = AdminClient.create(adminClientConfig)

val reassignmentsResult = client.alterPartitionReassignments(Map(reassignmentEntry(new TopicPartition(topic, 0), Seq(0, 1))).asJava)
var reassignmentFailed = false
try {
reassignmentsResult.all().get()
} catch {
case e : ExecutionException =>
assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
reassignmentFailed = true
}
assertTrue("the partition reassignment should have failed", reassignmentFailed)
client.close()
}

def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment]) =
tp -> Optional.of(new NewPartitionReassignment((replicas.map(_.asInstanceOf[Integer]).asJava)))


@Test
def testTopicCreatedInZkShouldBeRearrangedForMaintenanceBrokers(): Unit = {

Expand Down