Skip to content

Commit

Permalink
[grid] Catch exception in local distributor that was preventing nodes…
Browse files Browse the repository at this point in the history
… being added
  • Loading branch information
shs96c committed Jan 16, 2020
1 parent 62867fd commit 5e47c6a
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -274,6 +275,8 @@ boolean allBucketsSameSize(Map<String, Set<Host>> hostBuckets) {
private void refresh(NodeStatus status) {
Objects.requireNonNull(status);

LOG.fine("Refreshing: " + status.getUri());

// Iterate over the available nodes to find a match.
Lock writeLock = lock.writeLock();
writeLock.lock();
Expand All @@ -282,12 +285,13 @@ private void refresh(NodeStatus status) {
.filter(host -> host.getId().equals(status.getNodeId()))
.findFirst();


if (existing.isPresent()) {
// Modify the state
LOG.fine("Modifying existing state");
existing.get().update(status);
} else {
// No match made. Add a new host.
LOG.info("Creating a new remote node for " + status.getUri());
Node node = new RemoteNode(
tracer,
clientFactory,
Expand Down Expand Up @@ -316,7 +320,10 @@ private LocalDistributor add(Node node, NodeStatus status) {

Host host = new Host(bus, node);
host.update(status);

LOG.fine("Adding host: " + host.asSummary());
hosts.add(host);

LOG.info(String.format("Added node %s.", node.getId()));
host.runHealthCheck();

Expand All @@ -325,6 +332,8 @@ private LocalDistributor add(Node node, NodeStatus status) {
nodeRunnables.add(runnable);
allChecks.put(node.getId(), nodeRunnables);
hostChecker.submit(runnable, Duration.ofMinutes(5), Duration.ofSeconds(30));
} catch (Throwable t) {
LOG.log(Level.WARNING, "Unable to process host", t);
} finally {
writeLock.unlock();
}
Expand Down

0 comments on commit 5e47c6a

Please sign in to comment.