Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[java] allow a DevTools listener to determinate the order of handler calls #13921

Merged
merged 2 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions java/src/org/openqa/selenium/devtools/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -70,11 +71,12 @@ public class Connection implements Closeable {
return thread;
});
private static final AtomicLong NEXT_ID = new AtomicLong(1L);
private static final AtomicLong NEXT_SEQUENCE = new AtomicLong(1L);
private WebSocket socket;
private final Map<Long, Consumer<Either<Throwable, JsonInput>>> methodCallbacks =
new ConcurrentHashMap<>();
private final ReadWriteLock callbacksLock = new ReentrantReadWriteLock(true);
private final Map<Event<?>, List<Consumer<?>>> eventCallbacks = new HashMap<>();
private final Map<Event<?>, List<BiConsumer<Long, ?>>> eventCallbacks = new HashMap<>();
private HttpClient client;
private final String url;
private final AtomicBoolean isClosed;
Expand Down Expand Up @@ -196,7 +198,7 @@ public <X> X sendAndWait(SessionID sessionId, Command<X> command, Duration timeo
}
}

public <X> void addListener(Event<X> event, Consumer<X> handler) {
public <X> void addListener(Event<X> event, BiConsumer<Long, X> handler) {
Require.nonNull("Event to listen for", event);
Require.nonNull("Handler to call", handler);

Expand Down Expand Up @@ -230,10 +232,11 @@ private class Listener implements WebSocket.Listener {

@Override
public void onText(CharSequence data) {
long sequence = NEXT_SEQUENCE.getAndIncrement();
EXECUTOR.execute(
() -> {
try {
handle(data);
handle(sequence, data);
} catch (Throwable t) {
LOG.log(Level.WARNING, "Unable to process: " + data, t);
throw new DevToolsException(t);
Expand All @@ -242,7 +245,7 @@ public void onText(CharSequence data) {
}
}

private void handle(CharSequence data) {
private void handle(long sequence, CharSequence data) {
// It's kind of gross to decode the data twice, but this lets us get started on something
// that feels nice to users.
// TODO: decode once, and once only
Expand Down Expand Up @@ -335,14 +338,14 @@ private void handle(CharSequence data) {
return;
}

for (Consumer<?> action : event.getValue()) {
for (BiConsumer<Long, ?> action : event.getValue()) {
@SuppressWarnings("unchecked")
Consumer<Object> obj = (Consumer<Object>) action;
BiConsumer<Long, Object> obj = (BiConsumer<Long, Object>) action;
LOG.log(
getDebugLogLevel(),
"Calling callback for {0} using {1} being passed {2}",
new Object[] {event.getKey(), obj, params});
obj.accept(params);
obj.accept(sequence, params);
}
}
});
Expand Down
24 changes: 24 additions & 0 deletions java/src/org/openqa/selenium/devtools/DevTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
Expand Down Expand Up @@ -89,10 +90,33 @@ public <X> X send(Command<X> command) {
return connection.sendAndWait(cdpSession, command, timeout);
}

/**
* Register a handler to the given event.
*
* @param event the event to listen to
* @param handler the handler to register
* @param <X> type of the data generated by the event
*/
public <X> void addListener(Event<X> event, Consumer<X> handler) {
Require.nonNull("Event to listen for", event);
Require.nonNull("Handler to call", handler);

connection.addListener(event, (sequence, x) -> handler.accept(x));
}

/**
* Register a handler to the given event, this handler will receive a sequence number to
* determinate the order of the data generated by the event. The sequence number might have gaps
* when other events are raised in between.
*
* @param event the event to listen to
* @param handler the handler to register
* @param <X> type of the data generated by the event
*/
public <X> void addListener(Event<X> event, BiConsumer<Long, X> handler) {
Require.nonNull("Event to listen for", event);
Require.nonNull("Handler to call", handler);

connection.addListener(event, handler);
}

Expand Down
Loading