Skip to content

Commit

Permalink
[java] Flesh out Reactor HTTP client usage in server
Browse files Browse the repository at this point in the history
  • Loading branch information
AutomatedTester committed Jun 5, 2020
1 parent f5fc6cd commit 415b45c
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ java_library(
"//java/client/src/org/openqa/selenium:core",
"//java/client/src/org/openqa/selenium/remote/http",
artifact("com.google.guava:guava"),
artifact("com.typesafe.netty:netty-reactive-streams"),
artifact("org.asynchttpclient:async-http-client"),
artifact("io.netty:netty-buffer"),
artifact("io.netty:netty-codec-http"),
artifact("io.netty:netty-transport"),
artifact("io.netty:netty-transport-native-epoll"),
artifact("io.netty:netty-transport-native-epoll-linux-x86_64"),
artifact("io.netty:netty-transport-native-kqueue"),
artifact("io.netty:netty-transport-native-kqueue-osx-x86_64"),
artifact("io.netty:netty-transport-native-unix-common"),
artifact("io.projectreactor:reactor-core"),
artifact("io.projectreactor.netty:reactor-netty"),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.openqa.selenium.remote.http.reactor;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.auto.service.AutoService;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;

import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.AddSeleniumUserAgent;
import org.openqa.selenium.remote.http.ClientConfig;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpClientName;
Expand All @@ -41,8 +45,13 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -64,17 +73,40 @@ public class ReactorClient implements HttpClient {
private ReactorClient(ClientConfig config) {
this.config = config;
httpClient = reactor.netty.http.client.HttpClient.create()
.baseUrl(config.baseUrl().toString())
.keepAlive(true);
.baseUrl(config.baseUrl().toString());
}

@Override
public HttpResponse execute(HttpRequest request) {
StringBuilder uri = new StringBuilder(request.getUri());
List<String> queryPairs = new ArrayList<>();
request.getQueryParameterNames().forEach(
name -> request.getQueryParameters(name).forEach(
value -> {
try {
queryPairs.add(
URLEncoder.encode(name, UTF_8.toString()) + "=" + URLEncoder.encode(value, UTF_8.toString()));
} catch (UnsupportedEncodingException e) {
Thread.currentThread().interrupt();
throw new UncheckedIOException(e);
}
}));
if (!queryPairs.isEmpty()) {
uri.append("?");
Joiner.on('&').appendTo(uri, queryPairs);
}

Tuple2<InputStream, HttpResponse> result = httpClient
.headers(h -> request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value))))
.headers(h -> {
request.getHeaderNames().forEach(
name -> request.getHeaders(name).forEach(value -> h.set(name, value)));
if (request.getHeader("User-Agent") == null) {
h.set("User-Agent", AddSeleniumUserAgent.USER_AGENT);
}
}
)
.request(methodMap.get(request.getMethod()))
.uri(request.getUri())
.uri(uri.toString())
.send((r, out) -> out.send(fromInputStream(request.getContent().get())))
.responseSingle((res, buf) -> {
HttpResponse toReturn = new HttpResponse();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.remote.http.reactor;

import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Response;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.http.ClientConfig;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.http.RemoteCall;


import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ReactorHandler extends RemoteCall {

private final HttpHandler handler;
private final AsyncHttpClient client;

protected ReactorHandler(ClientConfig config, AsyncHttpClient client) {
super(config);
this.client = client;
this.handler = config.filter().andFinally(this::makeCall);
}

private HttpResponse makeCall(HttpRequest request) {
Require.nonNull("Request", request);

Future<org.asynchttpclient.Response> whenResponse = client.executeRequest(
ReactorMessages.toReactorRequest(getConfig().baseUri(), request));

try {
Response response = whenResponse.get();
return ReactorMessages.toSeleniumResponse(response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("ReactorHttpHandler request interrupted", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof UncheckedIOException) {
throw (UncheckedIOException) cause;
}

if (cause instanceof IOException) {
throw new UncheckedIOException((IOException) cause);
}

throw new RuntimeException("ReactorHttpHandler request execution error", e);
}
}

@Override
public HttpResponse execute(HttpRequest request) {
return handler.execute(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.remote.http.reactor;

import static org.asynchttpclient.Dsl.request;
import static org.openqa.selenium.remote.http.Contents.empty;
import static org.openqa.selenium.remote.http.Contents.memoize;

import com.google.common.base.Strings;

import org.asynchttpclient.Dsl;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.openqa.selenium.remote.http.AddSeleniumUserAgent;
import org.openqa.selenium.remote.http.HttpMethod;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;

import java.net.URI;

public class ReactorMessages {

private ReactorMessages() {
// Utility classes.
}

protected static Request toReactorRequest(URI baseUrl, HttpRequest request) {
String rawUrl;

String uri = request.getUri();
if (uri.startsWith("ws://")) {
rawUrl = "http://" + uri.substring("ws://".length());
} else if (uri.startsWith("wss://")) {
rawUrl = "https://" + uri.substring("wss://".length());
} else if (uri.startsWith("http://") || uri.startsWith("https://")) {
rawUrl = uri;
} else {
rawUrl = baseUrl.toString().replaceAll("/$", "") + uri;
}

RequestBuilder builder = request(request.getMethod().toString(), rawUrl);

for (String name : request.getQueryParameterNames()) {
for (String value : request.getQueryParameters(name)) {
builder.addQueryParam(name, value);
}
}

for (String name : request.getHeaderNames()) {
for (String value : request.getHeaders(name)) {
builder.addHeader(name, value);
}
}
if (request.getHeader("User-Agent") == null) {
builder.addHeader("User-Agent", AddSeleniumUserAgent.USER_AGENT);
}

String info = baseUrl.getUserInfo();
if (!Strings.isNullOrEmpty(info)) {
String[] parts = info.split(":", 2);
String user = parts[0];
String pass = parts.length > 1 ? parts[1] : null;

builder.setRealm(Dsl.basicAuthRealm(user, pass).setUsePreemptiveAuth(true).build());
}

if (request.getMethod().equals(HttpMethod.POST)) {
builder.setBody(request.getContent().get());
}

return builder.build();
}

public static HttpResponse toSeleniumResponse(Response response) {
HttpResponse toReturn = new HttpResponse();

toReturn.setStatus(response.getStatusCode());

toReturn.setContent(! response.hasResponseBody()
? empty()
: memoize(response::getResponseBodyAsStream));

response.getHeaders().names().forEach(
name -> response.getHeaders(name).forEach(value -> toReturn.addHeader(name, value)));

return toReturn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
load("@rules_jvm_external//:defs.bzl", "artifact")
load("//java:defs.bzl", "java_test_suite")

java_test_suite(
name = "medium-tests",
size = "medium",
srcs = glob(["*.java"]),
deps = [
"//java/client/src/org/openqa/selenium/remote/http",
"//java/client/src/org/openqa/selenium/remote/http/reactor",
"//java/client/test/org/openqa/selenium/remote/internal:test-lib",
"//java/client/test/org/openqa/selenium/testing:test-base",
artifact("com.google.guava:guava"),
artifact("org.asynchttpclient:async-http-client"),
artifact("junit:junit"),
artifact("org.assertj:assertj-core"),
artifact("io.netty:netty-buffer"),
artifact("io.netty:netty-codec-http"),
artifact("io.netty:netty-transport"),
artifact("io.netty:netty-transport-native-epoll"),
artifact("io.netty:netty-transport-native-epoll-linux-x86_64"),
artifact("io.netty:netty-transport-native-kqueue"),
artifact("io.netty:netty-transport-native-kqueue-osx-x86_64"),
artifact("io.netty:netty-transport-native-unix-common"),
artifact("com.typesafe.netty:netty-reactive-streams"),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.remote.http.reactor;

import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.internal.HttpClientTestBase;

public class ReactorClientTest extends HttpClientTestBase {

@Override
protected HttpClient.Factory createFactory() {
return new ReactorClient.Factory();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.remote.http.reactor;

import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.http.WebSocket;
import org.openqa.selenium.remote.internal.HttpClientTestBase;

import java.io.UncheckedIOException;

public class ReactorHandlerTest extends HttpClientTestBase {

private static final AsyncHttpClient httpClient = Dsl.asyncHttpClient(
new DefaultAsyncHttpClientConfig.Builder()
.setAggregateWebSocketFrameFragments(true)
.setWebSocketMaxBufferSize(Integer.MAX_VALUE)
.setWebSocketMaxFrameSize(Integer.MAX_VALUE));
@Override
protected HttpClient.Factory createFactory() {
return config -> {
ReactorHandler reactorHandler = new ReactorHandler(config, httpClient);

return new HttpClient() {
@Override
public WebSocket openSocket(HttpRequest request, WebSocket.Listener listener) {
throw new UnsupportedOperationException("openSocket");
}

@Override
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
return reactorHandler.execute(req);
}
};
};
}
}

0 comments on commit 415b45c

Please sign in to comment.