Skip to content

Commit

Permalink
[fix][dingo-sdk] Watch key all event
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 authored and ketor committed Aug 19, 2024
1 parent ad8e5ee commit d13bc8d
Showing 1 changed file with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;

import static io.dingodb.sdk.service.entity.version.EventType.DELETE;
import static io.dingodb.sdk.service.entity.version.EventType.NOT_EXISTS;
Expand Down Expand Up @@ -469,14 +470,36 @@ public void watchAllOpLock(Kv kv, Runnable task) {
try {
Thread.sleep(1000L);
} catch (Exception e1) {
}

}
}
task.run();
watchAllOpLock(kv, task);
});
}

public void watchAllOpEvent(Kv kv, Function<String, String> function) {
CompletableFuture.supplyAsync(() ->
kvService.watch(watchAllOpRequest(kv.getKv().getKey(), kv.getModRevision()))
).whenCompleteAsync((r, e) -> {
if (e != null) {
if (!(e instanceof DingoClientException)) {
watchAllOpEvent(kv, function);
return;
}
log.error("Watch locked error, or watch retry time great than lease ttl.", e);
return;
}
String typeStr = "normal";
if (r.getEvents().stream().map(Event::getType).anyMatch(type -> type == DELETE || type == NOT_EXISTS)) {
typeStr = "keyNone";
}
function.apply(typeStr);
watchAllOpEvent(kv, function);
});
}


private PutRequest putRequest(String resourceKey, String value) {
return PutRequest.builder()
.lease(lease())
Expand Down

0 comments on commit d13bc8d

Please sign in to comment.