Skip to content

Commit

Permalink
Make sendUpdate lock free
Browse files Browse the repository at this point in the history
Ensure that only single thread
will execute sendUpdate
  • Loading branch information
sopel39 committed Sep 5, 2022
1 parent 48e5b53 commit 33e4afe
Showing 1 changed file with 73 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ public final class HttpRemoteTask
// The version of dynamic filters that has been successfully sent to the worker
private final AtomicLong sentDynamicFiltersVersion = new AtomicLong(INITIAL_DYNAMIC_FILTERS_VERSION);

@GuardedBy("pendingRequestsCounter")
private Future<?> currentRequest;
private final AtomicReference<Future<?>> currentRequest = new AtomicReference<>();

@GuardedBy("this")
private final SetMultimap<PlanNodeId, ScheduledSplit> pendingSplits = HashMultimap.create();
Expand Down Expand Up @@ -163,7 +162,7 @@ public final class HttpRemoteTask

private final RequestErrorTracker updateErrorTracker;

private final AtomicInteger pendingRequestsCounter = new AtomicInteger(1);
private final AtomicInteger pendingRequestsCounter = new AtomicInteger(0);
private final AtomicBoolean sendPlan = new AtomicBoolean(true);

private final PartitionedSplitCountTracker partitionedSplitCountTracker;
Expand Down Expand Up @@ -350,7 +349,7 @@ public void start()
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
// to start we just need to trigger an update
started.set(true);
scheduleUpdate();
triggerUpdate();

dynamicFiltersFetcher.start();
taskStatusFetcher.start();
Expand Down Expand Up @@ -576,6 +575,10 @@ private void scheduleUpdate()

private void triggerUpdate()
{
if (!started.get()) {
// task has not started yet
return;
}
if (pendingRequestsCounter.getAndIncrement() == 0) {
// schedule update if this is the first update requested
scheduleUpdate();
Expand All @@ -584,72 +587,59 @@ private void triggerUpdate()

private void sendUpdate()
{
synchronized (pendingRequestsCounter) {
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || taskStatus.getState().isDone()) {
return;
}
TaskStatus taskStatus = getTaskStatus();
// don't update if the task is already finished
if (taskStatus.getState().isDone()) {
return;
}
checkState(started.get());

int currentPendingRequestsCounter = pendingRequestsCounter.get();
if (currentPendingRequestsCounter == 0) {
return;
}
int currentPendingRequestsCounter = pendingRequestsCounter.get();
checkState(currentPendingRequestsCounter > 0, "sendUpdate shouldn't be called without pending requests");

// if there is a request already running, wait for it to complete
// currentRequest is always cleared when request is complete
if (currentRequest != null) {
return;
}
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<Void> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}

// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<Void> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}
List<SplitAssignment> splitAssignments = getSplitAssignments();
VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion.get());

List<SplitAssignment> splitAssignments = getSplitAssignments();
VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion.get());

// Workers don't need the embedded JSON representation when the fragment is sent
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
splitAssignments,
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
}

HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = preparePost()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();
// Workers don't need the embedded JSON representation when the fragment is sent
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
splitAssignments,
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
}

updateErrorTracker.startRequest();
HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = preparePost()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();

ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequest = future;
updateErrorTracker.startRequest();

// if pendingRequestsCounter is still non-zero (e.g. because triggerUpdate was called in the meantime)
// then the request Future callback will send a new update via sendUpdate method call
pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter);
ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
checkState(currentRequest.getAndSet(future) == null, "There should be no previous request running");

Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion(), System.nanoTime()), request.getUri(), stats),
executor);
}
Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion(), System.nanoTime(), currentPendingRequestsCounter), request.getUri(), stats),
executor);
}

private synchronized List<SplitAssignment> getSplitAssignments()
Expand Down Expand Up @@ -707,11 +697,9 @@ private void cleanUpTask()
outboundDynamicFiltersCollector.acknowledge(Long.MAX_VALUE);

// cancel pending request
synchronized (pendingRequestsCounter) {
if (currentRequest != null) {
currentRequest.cancel(true);
currentRequest = null;
}
Future<?> request = currentRequest.getAndSet(null);
if (request != null) {
request.cancel(true);
}

taskStatusFetcher.stop();
Expand Down Expand Up @@ -912,33 +900,32 @@ private class UpdateResponseHandler
private final List<SplitAssignment> splitAssignments;
private final long currentRequestDynamicFiltersVersion;
private final long currentRequestStartNanos;
private final int currentPendingRequestsCounter;

private UpdateResponseHandler(List<SplitAssignment> splitAssignments, long currentRequestDynamicFiltersVersion, long currentRequestStartNanos)
private UpdateResponseHandler(List<SplitAssignment> splitAssignments, long currentRequestDynamicFiltersVersion, long currentRequestStartNanos, int currentPendingRequestsCounter)
{
this.splitAssignments = ImmutableList.copyOf(requireNonNull(splitAssignments, "splitAssignments is null"));
this.currentRequestDynamicFiltersVersion = currentRequestDynamicFiltersVersion;
this.currentRequestStartNanos = currentRequestStartNanos;
this.currentPendingRequestsCounter = currentPendingRequestsCounter;
}

@Override
public void success(TaskInfo value)
{
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
sentDynamicFiltersVersion.set(currentRequestDynamicFiltersVersion);
// Remove dynamic filters which were successfully sent to free up memory
outboundDynamicFiltersCollector.acknowledge(currentRequestDynamicFiltersVersion);
sendPlan.set(value.isNeedsPlan());
synchronized (pendingRequestsCounter) {
currentRequest = null;
}
updateStats();
processTaskUpdate(value, splitAssignments);
updateErrorTracker.requestSucceeded();
}
finally {
sendUpdate();
sentDynamicFiltersVersion.set(currentRequestDynamicFiltersVersion);
// Remove dynamic filters which were successfully sent to free up memory
outboundDynamicFiltersCollector.acknowledge(currentRequestDynamicFiltersVersion);
sendPlan.set(value.isNeedsPlan());
currentRequest.set(null);
updateStats();
updateErrorTracker.requestSucceeded();
if (pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter) > 0) {
// schedule an update because triggerUpdate was called in the meantime
scheduleUpdate();
}
processTaskUpdate(value, splitAssignments);
}
}

Expand All @@ -947,19 +934,17 @@ public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
synchronized (pendingRequestsCounter) {
currentRequest = null;
}
currentRequest.set(null);
updateStats();

// on failure assume we need to update again
pendingRequestsCounter.incrementAndGet();

// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
updateErrorTracker.requestFailed(cause);
}

// on failure assume we need to update again
scheduleUpdate();
}
catch (Error e) {
fail(e);
Expand All @@ -968,9 +953,6 @@ public void failed(Throwable cause)
catch (RuntimeException e) {
fail(e);
}
finally {
sendUpdate();
}
}
}

Expand Down

0 comments on commit 33e4afe

Please sign in to comment.