From eb17e8c3bef61db33b5ac7e11e9119509bff1d90 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Fri, 8 Mar 2024 09:47:11 -0800 Subject: [PATCH] rls: Fix a local and remote race The local race passes `rlsPicker` to the channel before CachingRlsLbClient is finished constructing. `RlsPicker` can use multiple of the fields not yet initialized. This seems not to be happening in practice, because it appears like it would break things very loudly (e.g., NPE). The remote race seems incredibly hard to hit, because it requires an RPC to complete before the pending data tracking the RPC is added to a map. But with if a system is at 100% CPU utilization, maybe it can be hit. If it is hit, all RPCs needing the impacted cache entry will forever be buffered. --- .../java/io/grpc/rls/CachingRlsLbClient.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index e8595961613..ff0410bbdc4 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -166,7 +166,6 @@ private CachingRlsLbClient(Builder builder) { rlsChannelBuilder.disableServiceConfigLookUp(); } rlsChannel = rlsChannelBuilder.build(); - helper.updateBalancingState(ConnectivityState.CONNECTING, rlsPicker); rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel); childLbResolvedAddressFactory = checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory"); @@ -285,7 +284,11 @@ private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) { ListenableFuture asyncCall = asyncRlsCall(request); if (!asyncCall.isDone()) { pendingEntry = new PendingCacheEntry(request, asyncCall); + // Add the entry to the map before adding the Listener, because the listener removes the + // entry from the map pendingCallCache.put(request, pendingEntry); + // Beware that the listener can run immediately on the current thread + asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext); return CachedRouteLookupResponse.pendingResponse(pendingEntry); } else { // async call returned finished future is most likely throttled @@ -462,17 +465,9 @@ final class PendingCacheEntry { this.request = checkNotNull(request, "request"); this.pendingCall = pendingCall; this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy; - pendingCall.addListener( - new Runnable() { - @Override - public void run() { - handleDoneFuture(); - } - }, - synchronizationContext); } - private void handleDoneFuture() { + void handleDoneFuture() { synchronized (lock) { pendingCallCache.remove(request); if (pendingCall.isCancelled()) { @@ -589,7 +584,9 @@ void maybeRefresh() { } final ListenableFuture asyncCall = asyncRlsCall(request); if (!asyncCall.isDone()) { - pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall)); + PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall); + pendingCallCache.put(request, pendingEntry); + asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext); } else { // async call returned finished future is most likely throttled try { @@ -727,9 +724,10 @@ private void transitionToPending() { } ListenableFuture call = asyncRlsCall(request); if (!call.isDone()) { + linkedHashLruCache.invalidate(request); PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy); pendingCallCache.put(request, pendingEntry); - linkedHashLruCache.invalidate(request); + call.addListener(pendingEntry::handleDoneFuture, synchronizationContext); } else { try { RouteLookupResponse response = call.get(); @@ -837,7 +835,9 @@ Builder setBackoffProvider(BackoffPolicy.Provider provider) { } CachingRlsLbClient build() { - return new CachingRlsLbClient(this); + CachingRlsLbClient client = new CachingRlsLbClient(this); + helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker); + return client; } }