Skip to content

Commit

Permalink
SolaceIO: refactor to allow inheritance of BasicAuthSempClient (#32400)
Browse files Browse the repository at this point in the history
* Refactored to allow inheritance and overriding of BasicAuthSempClient

* Fix docs and use Map#computeIfAbsent with a lambda.

* Fix integration test

* Remove 'serializable'

* Revert 'Remove 'serializable''
  • Loading branch information
bzablocki authored Nov 27, 2024
1 parent 9696201 commit 9560fe1
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
*/
package org.apache.beam.sdk.io.solace.broker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.HttpRequestFactory;
import com.solacesystems.jcsmp.JCSMPFactory;
import java.io.IOException;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,8 +36,6 @@
@Internal
public class BasicAuthSempClient implements SempClient {
private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class);
private final ObjectMapper objectMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor;

Expand All @@ -58,13 +52,12 @@ public BasicAuthSempClient(

@Override
public boolean isQueueNonExclusive(String queueName) throws IOException {
LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName);
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().accessType().equals("non-exclusive");
boolean queueNonExclusive = sempBasicAuthClientExecutor.isQueueNonExclusive(queueName);
LOG.info(
"SolaceIO.Read: SempOperations: queried SEMP if queue {} is non-exclusive: {}",
queueName,
queueNonExclusive);
return queueNonExclusive;
}

@Override
Expand All @@ -77,12 +70,7 @@ public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, Strin

@Override
public long getBacklogBytes(String queueName) throws IOException {
BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().msgSpoolUsage();
return sempBasicAuthClientExecutor.getBacklogBytes(queueName);
}

private void createQueue(String queueName) throws IOException {
Expand All @@ -94,9 +82,4 @@ private void createSubscription(String queueName, String topicName) throws IOExc
LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName);
sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName);
}

private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
throws JsonProcessingException {
return objectMapper.readValue(content, mapSuccessToClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpHeaders;
Expand All @@ -40,6 +43,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.solace.data.Semp.Queue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -52,7 +56,7 @@
* response is 401 Unauthorized, the client will execute an additional request with Basic Auth
* header to refresh the token.
*/
class SempBasicAuthClientExecutor implements Serializable {
public class SempBasicAuthClientExecutor implements Serializable {
// Every request will be repeated 2 times in case of abnormal connection failures.
private static final int REQUEST_NUM_RETRIES = 2;
private static final Map<CookieManagerKey, CookieManager> COOKIE_MANAGER_MAP =
Expand All @@ -65,8 +69,10 @@ class SempBasicAuthClientExecutor implements Serializable {
private final String password;
private final CookieManagerKey cookieManagerKey;
private final transient HttpRequestFactory requestFactory;
private final ObjectMapper objectMapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

SempBasicAuthClientExecutor(
public SempBasicAuthClientExecutor(
String host,
String username,
String password,
Expand All @@ -78,7 +84,16 @@ class SempBasicAuthClientExecutor implements Serializable {
this.password = password;
this.requestFactory = httpRequestFactory;
this.cookieManagerKey = new CookieManagerKey(this.baseUrl, this.username);
COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager());
COOKIE_MANAGER_MAP.computeIfAbsent(this.cookieManagerKey, key -> new CookieManager());
}

public boolean isQueueNonExclusive(String queueName) throws IOException {
BrokerResponse response = getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().accessType().equals("non-exclusive");
}

private static String getQueueEndpoint(String messageVpn, String queueName)
Expand Down Expand Up @@ -199,6 +214,20 @@ private static String urlEncode(String queueName) throws UnsupportedEncodingExce
return URLEncoder.encode(queueName, StandardCharsets.UTF_8.name());
}

private <T> T mapJsonToClass(String content, Class<T> mapSuccessToClass)
throws JsonProcessingException {
return objectMapper.readValue(content, mapSuccessToClass);
}

public long getBacklogBytes(String queueName) throws IOException {
BrokerResponse response = getQueueResponse(queueName);
if (response.content == null) {
throw new IOException("SolaceIO: response from SEMP is empty!");
}
Queue q = mapJsonToClass(response.content, Queue.class);
return q.data().msgSpoolUsage();
}

private static class CookieManagerKey implements Serializable {
private final String baseUrl;
private final String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.apache.beam.sdk.values.KV;

/**
* This class a pseudo-key with a given cardinality. The downstream steps will use state {@literal
* &} timers to distribute the data and control for the number of parallel workers used for writing.
* This class adds pseudo-key with a given cardinality. The downstream steps will use state
* {@literal &} timers to distribute the data and control for the number of parallel workers used
* for writing.
*/
@Internal
public class AddShardKeyDoFn extends DoFn<Solace.Record, KV<Integer, Solace.Record>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.it;

import com.google.api.client.http.HttpRequestFactory;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClient;
import org.apache.beam.sdk.io.solace.broker.SempBasicAuthClientExecutor;
import org.apache.beam.sdk.util.SerializableSupplier;

/**
* Example class showing how the {@link BasicAuthSempClient} can be extended or have functionalities
* overridden. In this case, the modified method is {@link
* BasicAuthSempClient#getBacklogBytes(String)}, which queries multiple SEMP endpoints to collect
* accurate backlog metrics. For usage, see {@link SolaceIOMultipleSempIT}.
*/
public class BasicAuthMultipleSempClient extends BasicAuthSempClient {
private final List<SempBasicAuthClientExecutor> sempBacklogBasicAuthClientExecutors;

public BasicAuthMultipleSempClient(
String mainHost,
List<String> backlogHosts,
String username,
String password,
String vpnName,
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier) {
super(mainHost, username, password, vpnName, httpRequestFactorySupplier);
sempBacklogBasicAuthClientExecutors =
backlogHosts.stream()
.map(
host ->
new SempBasicAuthClientExecutor(
host, username, password, vpnName, httpRequestFactorySupplier.get()))
.collect(Collectors.toList());
}

@Override
public long getBacklogBytes(String queueName) throws IOException {
long backlog = 0;
for (SempBasicAuthClientExecutor client : sempBacklogBasicAuthClientExecutors) {
backlog += client.getBacklogBytes(queueName);
}
return backlog;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.it;

import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.auto.value.AutoValue;
import java.util.List;
import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Example class showing how to implement a custom {@link SempClientFactory} with custom client. For
* usage, see {@link SolaceIOMultipleSempIT}.
*/
@AutoValue
public abstract class BasicAuthMultipleSempClientFactory implements SempClientFactory {

public abstract String mainHost();

public abstract List<String> backlogHosts();

public abstract String username();

public abstract String password();

public abstract String vpnName();

public abstract @Nullable SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier();

public static Builder builder() {
return new AutoValue_BasicAuthMultipleSempClientFactory.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
/** Set Solace host, format: [Protocol://]Host[:Port]. */
public abstract Builder mainHost(String host);

public abstract Builder backlogHosts(List<String> hosts);

/** Set Solace username. */
public abstract Builder username(String username);
/** Set Solace password. */
public abstract Builder password(String password);

/** Set Solace vpn name. */
public abstract Builder vpnName(String vpnName);

abstract Builder httpRequestFactorySupplier(
SerializableSupplier<HttpRequestFactory> httpRequestFactorySupplier);

public abstract BasicAuthMultipleSempClientFactory build();
}

@Override
public SempClient create() {
return new BasicAuthMultipleSempClient(
mainHost(),
backlogHosts(),
username(),
password(),
vpnName(),
getHttpRequestFactorySupplier());
}

@SuppressWarnings("return")
private @NonNull SerializableSupplier<HttpRequestFactory> getHttpRequestFactorySupplier() {
SerializableSupplier<HttpRequestFactory> httpRequestSupplier = httpRequestFactorySupplier();
return httpRequestSupplier != null
? httpRequestSupplier
: () -> new NetHttpTransport().createRequestFactory();
}
}
Loading

0 comments on commit 9560fe1

Please sign in to comment.