diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 5e299fc0e02a8..4d2e3a7305e95 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -480,12 +480,10 @@ class BrokerServer( authorizer ), sharedServer.initialBrokerMetadataLoadFaultHandler, - sharedServer.metadataPublishingFaultHandler, - lifecycleManager + sharedServer.metadataPublishingFaultHandler ) metadataPublishers.add(brokerMetadataPublisher) brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId, - logManager.directoryIdsSet.toList.asJava, () => lifecycleManager.resendBrokerRegistrationUnlessZkMode()) metadataPublishers.add(brokerRegistrationTracker) diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index ee7bfa2157ee7..04a063fd21dd2 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -20,7 +20,7 @@ package kafka.server.metadata import java.util.{OptionalInt, Properties} import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager -import kafka.server.{BrokerLifecycleManager, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal} import kafka.utils.Logging import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException @@ -73,7 +73,6 @@ class BrokerMetadataPublisher( aclPublisher: AclPublisher, fatalFaultHandler: FaultHandler, metadataPublishingFaultHandler: FaultHandler, - brokerLifecycleManager: BrokerLifecycleManager, ) extends MetadataPublisher with Logging { logIdent = s"[BrokerMetadataPublisher id=${config.nodeId}] " diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 26f4fb3daee8c..5657cd37ec4e2 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -23,7 +23,7 @@ import java.util.Collections.{singleton, singletonList, singletonMap} import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import kafka.log.LogManager -import kafka.server.{BrokerLifecycleManager, BrokerServer, KafkaConfig, ReplicaManager} +import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager} import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET @@ -200,8 +200,7 @@ class BrokerMetadataPublisherTest { mock(classOf[DelegationTokenPublisher]), mock(classOf[AclPublisher]), faultHandler, - faultHandler, - mock(classOf[BrokerLifecycleManager]), + faultHandler ) val image = MetadataImage.EMPTY diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java index 51ac2bdfa4bd3..e8e0e77ea7ca5 100644 --- a/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java @@ -17,7 +17,6 @@ package org.apache.kafka.image.publisher; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -26,8 +25,6 @@ import org.apache.kafka.server.common.MetadataVersion; import org.slf4j.Logger; -import java.util.List; - /** * Tracks the registration of a specific broker, and executes a callback if it should be refreshed. * @@ -49,12 +46,10 @@ public class BrokerRegistrationTracker implements MetadataPublisher { * Create the tracker. * * @param id The ID of this broker. - * @param targetDirectories The directories managed by this broker. * @param refreshRegistrationCallback Callback to run if we need to refresh the registration. */ public BrokerRegistrationTracker( int id, - List targetDirectories, Runnable refreshRegistrationCallback ) { this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] "). diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java index 855a96cd8aaf3..4438acc75825b 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java @@ -48,8 +48,7 @@ public class BrokerRegistrationTrackerTest { static class BrokerRegistrationTrackerTestContext { AtomicInteger numCalls = new AtomicInteger(0); - BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1, - Arrays.asList(B, A), () -> numCalls.incrementAndGet()); + BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1, () -> numCalls.incrementAndGet()); MetadataImage image = MetadataImage.EMPTY;