Skip to content

Commit

Permalink
AMBARI-25635: Clear Cluster and METRIC_AGGREGATORS MBeans upon shutdo…
Browse files Browse the repository at this point in the history
…wn (#68)
  • Loading branch information
kevinw66 authored Nov 17, 2022
1 parent 85e2b92 commit 02af960
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -42,51 +41,54 @@
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.OnlineOfflineSMD;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.StateModelConfigGenerator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;

;

public class MetricCollectorHAController {
private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);

@VisibleForTesting
static final String CLUSTER_NAME = "ambari-metrics-cluster";
@VisibleForTesting
static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
@VisibleForTesting
static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name;
static final String INSTANCE_NAME_DELIMITER = "_";
private static final String INSTANCE_NAME_DELIMITER = "_";
private static final int PARTITION_NUMBER = 2;
private static final int REPLICATION_FACTOR = 1;

@VisibleForTesting
final String zkConnectUrl;
final String instanceHostname;
final InstanceConfig instanceConfig;
final AggregationTaskRunner aggregationTaskRunner;
final TimelineMetricConfiguration configuration;
private final String instanceHostname;
private final InstanceConfig instanceConfig;
private final AggregationTaskRunner aggregationTaskRunner;

// Cache list of known live instances
final List<String> liveInstanceNames = new ArrayList<>();
private final List<String> liveInstanceNames = new ArrayList<>(2);
private final LiveInstanceTracker liveInstanceTracker = new LiveInstanceTracker();

// Helix Admin
@VisibleForTesting
HelixAdmin admin;
// Helix Manager
HelixManager manager;
private HelixManager manager;

private volatile boolean isInitialized = false;

public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
this.configuration = configuration;
String instancePort;
try {
instanceHostname = configuration.getInstanceHostnameFromEnv();
instancePort = configuration.getInstancePort();

} catch (Exception e) {
LOG.error("Error reading configs from classpath, will resort to defaults.", e);
throw new MetricsSystemInitializationException(e.getMessage());
Expand All @@ -97,42 +99,32 @@ public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
String zkQuorum = configuration.getClusterZKQuorum();

if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+ zkClientPort +", quorum = " + zkQuorum);
throw new Exception(String.format("Unable to parse zookeeper quorum. clientPort = %s, quorum = %s", zkClientPort, zkQuorum));
}

zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum);

} catch (Exception e) {
LOG.error("Unable to load hbase-site from classpath.", e);
throw new MetricsSystemInitializationException(e.getMessage());
throw new MetricsSystemInitializationException(e.getMessage(), e);
}

instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort);
instanceConfig.setHostName(instanceHostname);
instanceConfig.setPort(instancePort);
instanceConfig.setInstanceEnabled(true);
aggregationTaskRunner = new AggregationTaskRunner(
instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
}

/**
* Name of Helix znode
*/
public String getClusterName() {
return CLUSTER_NAME;
aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl, CLUSTER_NAME);
}

/**
* Initialize the instance with zookeeper via Helix
*/
public void initializeHAController() throws Exception {
String clusterName = getClusterName();
// Create setup tool instance
admin = new ZKHelixAdmin(zkConnectUrl);
// create cluster
LOG.info("Creating zookeeper cluster node: " + clusterName);
boolean clusterAdded = admin.addCluster(clusterName, false);
LOG.info("Was cluster added successfully? " + clusterAdded);
// Create cluster namespace in zookeeper. Don't recreate if exists.
LOG.info(String.format("Creating zookeeper cluster node: %s", CLUSTER_NAME));
boolean clusterAdded = admin.addCluster(CLUSTER_NAME, false);
LOG.info(String.format("Was cluster added successfully? %s", clusterAdded));

// Adding host to the cluster
boolean success = false;
Expand All @@ -141,16 +133,16 @@ public void initializeHAController() throws Exception {

for (int i = 0; i < tries && !success; i++) {
try {
List<String> nodes = admin.getInstancesInCluster(clusterName);
if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
LOG.info("Adding participant instance " + instanceConfig);
admin.addInstance(clusterName, instanceConfig);
List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
if (!nodes.contains(instanceConfig.getInstanceName())) {
LOG.info(String.format("Adding participant instance %s", instanceConfig));
admin.addInstance(CLUSTER_NAME, instanceConfig);
}
success = true;
} catch (HelixException | ZkNoNodeException ex) {
LOG.warn("Helix Cluster not yet setup fully.");
if (i < tries - 1) {
LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
LOG.info(String.format("Waiting for %d seconds and retrying.", sleepTimeInSeconds));
TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
} else {
LOG.error(ex);
Expand All @@ -159,48 +151,42 @@ public void initializeHAController() throws Exception {
}

if (!success) {
LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
admin.addCluster(clusterName, true);
List<String> nodes = admin.getInstancesInCluster(clusterName);
if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
LOG.info("Adding participant instance " + instanceConfig);
admin.addInstance(clusterName, instanceConfig);
LOG.info(String.format("Trying to create %s again since waiting for the creation did not help.", CLUSTER_NAME));
admin.addCluster(CLUSTER_NAME, true);
List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
if (!nodes.contains(instanceConfig.getInstanceName())) {
LOG.info(String.format("Adding participant instance %s", instanceConfig));
admin.addInstance(CLUSTER_NAME, instanceConfig);
}
}

// Add a state model
if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
// Add an ONLINE-OFFLINE state model
if (admin.getStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL) == null) {
LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition(
StateModelConfigGenerator.generateConfigForOnlineOffline()));
admin.addStateModelDef(CLUSTER_NAME, DEFAULT_STATE_MODEL, OnlineOfflineSMD.build());
}

// Add resources with 1 cluster-wide replica
// Since our aggregators are unbalanced in terms of work distribution we
// only need to distribute writes to METRIC_AGGREGATE and
// METRIC_RECORD_MINUTE
List<String> resources = admin.getResourcesInCluster(clusterName);
// METRIC_RECORD_MINUTE, i.e. the Host level and Cluster level aggregations
List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
if (!resources.contains(METRIC_AGGREGATORS)) {
LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas");
admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
LOG.info(String.format("Adding resource %s with %d partitions and %d replicas", METRIC_AGGREGATORS, PARTITION_NUMBER, REPLICATION_FACTOR));
admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, PARTITION_NUMBER, DEFAULT_STATE_MODEL, FULL_AUTO.toString());
}
// this will set up the ideal state, it calculates the preference list for
// each partition similar to consistent hashing
admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
// This will set up the ideal state, it calculates the preference list for each partition similar to consistent hashing.
admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, REPLICATION_FACTOR);

// Start participant
startAggregators();

// Start controller
startController();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
aggregationTaskRunner.stop();
manager.disconnect();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
shutdownHAController();
}));

isInitialized = true;
}
Expand All @@ -215,32 +201,41 @@ public boolean isInitialized() {
private void startAggregators() {
try {
aggregationTaskRunner.initialize();

} catch (Exception e) {
LOG.error("Unable to start aggregators.", e);
throw new MetricsSystemInitializationException(e.getMessage());
throw new MetricsSystemInitializationException(e.getMessage(), e);
}
}

private void startController() throws Exception {
manager = HelixManagerFactory.getZKHelixManager(
getClusterName(),
instanceHostname,
InstanceType.CONTROLLER,
zkConnectUrl
);
manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceHostname, InstanceType.CONTROLLER, zkConnectUrl);

manager.connect();
HelixController controller = new HelixController();
manager.addLiveInstanceChangeListener(controller);
manager.addLiveInstanceChangeListener(liveInstanceTracker);
}

public void shutdownHAController() {
if (isInitialized) {
LOG.info("Shooting down Metrics Collector's HAController.");

PropertyKey.Builder keyBuilder = new PropertyKey.Builder(CLUSTER_NAME);
manager.removeListener(keyBuilder.liveInstances(), liveInstanceTracker);
liveInstanceTracker.shutdown();
aggregationTaskRunner.stop();
manager.disconnect();
admin.close();

isInitialized = false;
LOG.info("Shutdown of Metrics Collector's HAController finished.");
}
}

public AggregationTaskRunner getAggregationTaskRunner() {
return aggregationTaskRunner;
}

public List<String> getLiveInstanceHostNames() {
List<String> liveInstanceHostNames = new ArrayList<>();
List<String> liveInstanceHostNames = new ArrayList<>(2);

for (String instance : liveInstanceNames) {
liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
Expand All @@ -249,52 +244,44 @@ public List<String> getLiveInstanceHostNames() {
return liveInstanceHostNames;
}

public class HelixController extends GenericHelixController {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Joiner joiner = Joiner.on(", ").skipNulls();
public final class LiveInstanceTracker implements LiveInstanceChangeListener {
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final Joiner joiner = Joiner.on(", ").skipNulls();

@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
super.onLiveInstanceChange(liveInstances, changeContext);

liveInstanceNames.clear();
for (LiveInstance instance : liveInstances) {
liveInstanceNames.add(instance.getInstanceName());
}

LOG.info("Detected change in liveliness of Collector instances. " +
"LiveIsntances = " + joiner.join(liveInstanceNames));
LOG.info(String.format("Detected change in liveliness of Collector instances. LiveInstances = %s", joiner.join(liveInstanceNames)));
// Print HA state - after some delay
executorService.schedule(new Runnable() {
@Override
public void run() {
printClusterState();
}
}, 30, TimeUnit.SECONDS);

executorService.schedule(() -> printClusterState(), 30, TimeUnit.SECONDS);
}

public void shutdown() {
executorService.shutdown();
}
}

public void printClusterState() {
StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################");

ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
if (resourceExternalView != null) {
getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
getPrintableResourceState(resourceExternalView, sb);
}
sb.append("\n##################################################");
LOG.info(sb.toString());
}

private void getPrintableResourceState(ExternalView resourceExternalView,
String resourceName,
StringBuilder sb) {
private void getPrintableResourceState(ExternalView resourceExternalView, StringBuilder sb) {
TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet());
sb.append("\nCLUSTER: ");
sb.append(getClusterName());
sb.append(CLUSTER_NAME);
sb.append("\nRESOURCE: ");
sb.append(resourceName);
sb.append(MetricCollectorHAController.METRIC_AGGREGATORS);
for (String partitionName : sortedSet) {
sb.append("\nPARTITION: ");
sb.append(partitionName).append("\t");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public void testHAControllerDistributedAggregation() throws Exception {
// Re-assigned partitions
Assert.assertEquals(2, partitionInstanceMap.size());

haController.getAggregationTaskRunner().stop();
haController.manager.disconnect();
haController.shutdownHAController();
}
}

0 comments on commit 02af960

Please sign in to comment.