Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2497: Close MetadataResourceUtil when job is stopped #1333

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public class ClusterBasedJobCoordinator {

private final SystemAdmins systemAdmins;

/**
* Util to close the KafkaCheckpointManager later on
*/
private MetadataResourceUtil metadataResourceUtil;

/**
* Internal variable for the instance of {@link JmxServer}
*/
Expand Down Expand Up @@ -259,7 +264,7 @@ public void run() {

//create necessary checkpoint and changelog streams, if not created
JobModel jobModel = jobModelManager.jobModel();
MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
metadataResourceUtil.createResources();

// fan out the startpoints
Expand Down Expand Up @@ -341,6 +346,7 @@ private void onShutDown() {
systemAdmins.stop();
shutDowncontainerPlacementRequestAllocatorAndUtils();
containerProcessManager.stop();
metadataResourceUtil.stop();
metadataStore.close();
} catch (Throwable e) {
LOG.error("Exception while stopping cluster based job coordinator", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public void createResources() {
createChangelogStreams();
}

public void stop() {
checkpointManager.stop();
}

@VisibleForTesting
void createChangelogStreams() {
ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class PassthroughJobCoordinator implements JobCoordinator {
private final String processorId;
private final Config config;
private final LocationId locationId;
private MetadataResourceUtil metadataResourceUtil;
private JobCoordinatorListener coordinatorListener = null;

public PassthroughJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry) {
Expand All @@ -85,7 +86,7 @@ public void start() {
try {
jobModel = getJobModel();
// TODO metrics registry has been null here for a while; is it safe?
MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, null, config);
metadataResourceUtil = new MetadataResourceUtil(jobModel, null, config);
metadataResourceUtil.createResources();
} catch (Exception e) {
LOGGER.error("Exception while trying to getJobModel.", e);
Expand All @@ -111,6 +112,7 @@ public void stop() {
coordinatorListener.onJobModelExpired();
coordinatorListener.onCoordinatorStop();
}
metadataResourceUtil.stop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator {
private JobModel newJobModel;
private boolean hasLoadedMetadataResources = false;
private String cachedJobModelVersion = null;
private MetadataResourceUtil metadataResourceUtil;

@VisibleForTesting
ZkSessionMetrics zkSessionMetrics;
Expand Down Expand Up @@ -312,7 +313,7 @@ JobModel readJobModelFromMetadataStore(String zkJobModelVersion) {
@VisibleForTesting
void loadMetadataResources(JobModel jobModel) {
try {
MetadataResourceUtil metadataResourceUtil = createMetadataResourceUtil(jobModel, config);
metadataResourceUtil = createMetadataResourceUtil(jobModel, config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is metadataResourceUtil#stop is being stopped for ZkJobCoordinator?

metadataResourceUtil.createResources();

if (coordinatorStreamStore != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.samza.job.local

import java.util.concurrent.CountDownLatch

import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
Expand All @@ -45,7 +45,8 @@ object ProcessJob {
class ProcessJob(
commandBuilder: CommandBuilder,
val jobModelManager: JobModelManager,
val coordinatorStreamStore: CoordinatorStreamStore) extends StreamJob with Logging {
val coordinatorStreamStore: CoordinatorStreamStore,
val metadataResourceUtil: MetadataResourceUtil) extends StreamJob with Logging {

import ProcessJob._

Expand Down Expand Up @@ -76,6 +77,7 @@ class ProcessJob(
case e: Exception => error("Encountered an error during job start: %s".format(e.getMessage))
} finally {
jobModelManager.stop
metadataResourceUtil.stop
coordinatorStreamStore.close
setStatus(if (processExitCode == 0) SuccessfulFinish else UnsuccessfulFinish)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
.setId("0")
.setUrl(jobModelManager.server.getUrl)

new ProcessJob(commandBuilder, jobModelManager, coordinatorStreamStore)
new ProcessJob(commandBuilder, jobModelManager, coordinatorStreamStore, metadataResourceUtil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
threadJob
} finally {
jobModelManager.stop
metadataResourceUtil.stop
if (jmxServer != null) {
jmxServer.stop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.samza.job.local

import com.google.common.collect.ImmutableMap
import org.apache.samza.config.MapConfig
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish}
Expand Down Expand Up @@ -59,7 +59,8 @@ object TestProcessJob {
new ProcessJob(
commandBuilder,
new MockJobModelManager,
new MockCoordinateStreamStore(MockConfigs, systemProducer, systemConsumer, systemAdmin))
new MockCoordinateStreamStore(MockConfigs, systemProducer, systemConsumer, systemAdmin),
new MockMetadataResourceUtil)
}

private def getMockJobModelManager(processJob: ProcessJob): MockJobModelManager = {
Expand All @@ -69,6 +70,10 @@ object TestProcessJob {
private def getMockCoordinatorStreamStore(processJob: ProcessJob): MockCoordinateStreamStore = {
processJob.coordinatorStreamStore.asInstanceOf[MockCoordinateStreamStore]
}

private def getMockMetadataResourceUtil(processJob: ProcessJob): MockMetadataResourceUtil = {
processJob.metadataResourceUtil.asInstanceOf[MockMetadataResourceUtil]
}
}

class TestProcessJob {
Expand All @@ -84,6 +89,7 @@ class TestProcessJob {
assertEquals(SuccessfulFinish, status)
assertTrue(getMockJobModelManager(processJob).stopped)
assertTrue(getMockCoordinatorStreamStore(processJob).closed)
assertTrue(getMockMetadataResourceUtil(processJob).stopped)
}

@Test
Expand All @@ -95,6 +101,7 @@ class TestProcessJob {
assertEquals(UnsuccessfulFinish, status)
assertTrue(getMockJobModelManager(processJob).stopped)
assertTrue(getMockCoordinatorStreamStore(processJob).closed)
assertTrue(getMockMetadataResourceUtil(processJob).stopped)
}

@Test
Expand All @@ -116,6 +123,7 @@ class TestProcessJob {
assertEquals(UnsuccessfulFinish, processJob.getStatus)
assertTrue(getMockJobModelManager(processJob).stopped)
assertTrue(getMockCoordinatorStreamStore(processJob).closed)
assertTrue(getMockMetadataResourceUtil(processJob).stopped)
}

@Test
Expand All @@ -127,6 +135,7 @@ class TestProcessJob {
assertEquals(UnsuccessfulFinish, processJob.getStatus)
assertTrue(getMockJobModelManager(processJob).stopped)
assertTrue(getMockCoordinatorStreamStore(processJob).closed)
assertTrue(getMockMetadataResourceUtil(processJob).stopped)
}

@Test
Expand All @@ -138,6 +147,7 @@ class TestProcessJob {
assertEquals(SuccessfulFinish, processJob.getStatus)
assertTrue(getMockJobModelManager(processJob).stopped)
assertTrue(getMockCoordinatorStreamStore(processJob).closed)
assertTrue(getMockMetadataResourceUtil(processJob).stopped)
}

@Test
Expand Down Expand Up @@ -186,4 +196,14 @@ class MockCoordinateStreamStore(
override def close: Unit = {
closed = true
}
}
}

class MockMetadataResourceUtil extends MetadataResourceUtil(null, null, null) {
var stopped: Boolean = false

override def createResources: Unit = {}

override def stop: Unit = {
stopped = true
}
}