diff --git a/java/dingo-sdk/src/main/java/io/dingodb/sdk/service/LockService.java b/java/dingo-sdk/src/main/java/io/dingodb/sdk/service/LockService.java index 49e9c559d..6e851971d 100644 --- a/java/dingo-sdk/src/main/java/io/dingodb/sdk/service/LockService.java +++ b/java/dingo-sdk/src/main/java/io/dingodb/sdk/service/LockService.java @@ -52,6 +52,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.function.Function; import static io.dingodb.sdk.service.entity.version.EventType.DELETE; import static io.dingodb.sdk.service.entity.version.EventType.NOT_EXISTS; @@ -469,14 +470,36 @@ public void watchAllOpLock(Kv kv, Runnable task) { try { Thread.sleep(1000L); } catch (Exception e1) { - - } + + } } task.run(); watchAllOpLock(kv, task); }); } + public void watchAllOpEvent(Kv kv, Function function) { + CompletableFuture.supplyAsync(() -> + kvService.watch(watchAllOpRequest(kv.getKv().getKey(), kv.getModRevision())) + ).whenCompleteAsync((r, e) -> { + if (e != null) { + if (!(e instanceof DingoClientException)) { + watchAllOpEvent(kv, function); + return; + } + log.error("Watch locked error, or watch retry time great than lease ttl.", e); + return; + } + String typeStr = "normal"; + if (r.getEvents().stream().map(Event::getType).anyMatch(type -> type == DELETE || type == NOT_EXISTS)) { + typeStr = "keyNone"; + } + function.apply(typeStr); + watchAllOpEvent(kv, function); + }); + } + + private PutRequest putRequest(String resourceKey, String value) { return PutRequest.builder() .lease(lease())