Skip to content

Commit

Permalink
need different way to reduce references
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Sep 8, 2023
1 parent 4b43246 commit 0de2e90
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,109 +135,114 @@ protected void setConnectionMessageTransformer(Consumer<MqttMessage> connectionM
MqttClientConnectionFixture() {
}

boolean connectDirectWithConfig(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions, EventLoopGroup elg, HostResolver hr, ClientBootstrap bootstrap)
boolean connectDirectWithConfig(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions)
{
try {
return connectDirectWithConfigThrows(tlsContext, endpoint, port, username, password, httpProxyOptions, elg, hr, bootstrap);
return connectDirectWithConfigThrows(tlsContext, endpoint, port, username, password, httpProxyOptions);
} catch (Exception ex) {
fail("Exception during connect: " + ex.toString());
}
return false;
}

boolean connectDirectWithConfigThrows(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions, EventLoopGroup elg, HostResolver hr, ClientBootstrap bootstrap) throws Exception
boolean connectDirectWithConfigThrows(TlsContext tlsContext, String endpoint, int port, String username, String password, HttpProxyOptions httpProxyOptions) throws Exception
{
// Connection callback events
MqttClientConnectionEvents events = new MqttClientConnectionEvents() {
@Override
public void onConnectionResumed(boolean sessionPresent) {
System.out.println("Connection resumed");
}
try(EventLoopGroup elg = new EventLoopGroup(1);
HostResolver hr = new HostResolver(elg);
ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);) {

// Connection callback events
MqttClientConnectionEvents events = new MqttClientConnectionEvents() {
@Override
public void onConnectionResumed(boolean sessionPresent) {
System.out.println("Connection resumed");
}

@Override
public void onConnectionInterrupted(int errorCode) {
if (!disconnecting) {
System.out.println(
"Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode));
@Override
public void onConnectionInterrupted(int errorCode) {
if (!disconnecting) {
System.out.println(
"Connection interrupted: error: " + errorCode + " " + CRT.awsErrorString(errorCode));
}
}
}

@Override
public void onConnectionFailure(OnConnectionFailureReturn data) {
System.out.println("Connection failed with error: " + data.getErrorCode() + " " + CRT.awsErrorString(data.getErrorCode()));
onConnectionFailureFuture.complete(data);
}
@Override
public void onConnectionFailure(OnConnectionFailureReturn data) {
System.out.println("Connection failed with error: " + data.getErrorCode() + " " + CRT.awsErrorString(data.getErrorCode()));
onConnectionFailureFuture.complete(data);
}

@Override
public void onConnectionSuccess(OnConnectionSuccessReturn data) {
System.out.println("Connection success. Session present: " + data.getSessionPresent());
onConnectionSuccessFuture.complete(data);
}
@Override
public void onConnectionSuccess(OnConnectionSuccessReturn data) {
System.out.println("Connection success. Session present: " + data.getSessionPresent());
onConnectionSuccessFuture.complete(data);
}

@Override
public void onConnectionClosed(OnConnectionClosedReturn data) {
System.out.println("Connection disconnected successfully");
onConnectionClosedFuture.complete(data);
}
};
@Override
public void onConnectionClosed(OnConnectionClosedReturn data) {
System.out.println("Connection disconnected successfully");
onConnectionClosedFuture.complete(data);
}
};

// Default settings
boolean cleanSession = true; // only true is supported right now
int keepAliveSecs = 0;
int protocolOperationTimeout = 60000;
String clientId = TEST_CLIENTID + (UUID.randomUUID()).toString();
// Default settings
boolean cleanSession = true; // only true is supported right now
int keepAliveSecs = 0;
int protocolOperationTimeout = 60000;
String clientId = TEST_CLIENTID + (UUID.randomUUID()).toString();

try (MqttConnectionConfig config = new MqttConnectionConfig()) {
try (MqttConnectionConfig config = new MqttConnectionConfig()) {

MqttClient client = null;
if (tlsContext != null)
{
client = new MqttClient(bootstrap, tlsContext);
}
else
{
client = new MqttClient(bootstrap);
}
MqttClient client = null;
if (tlsContext != null)
{
client = new MqttClient(bootstrap, tlsContext);
}
else
{
client = new MqttClient(bootstrap);
}

config.setMqttClient(client);
config.setClientId(clientId);
config.setEndpoint(endpoint);
config.setPort(port);
config.setCleanSession(cleanSession);
config.setKeepAliveSecs(keepAliveSecs);
config.setProtocolOperationTimeoutMs(protocolOperationTimeout);
config.setConnectionCallbacks(events);

if (httpProxyOptions != null) {
config.setHttpProxyOptions(httpProxyOptions);
}
if (username != null) {
config.setUsername(username);
}
if (password != null)
{
config.setPassword(password);
}
config.setMqttClient(client);
config.setClientId(clientId);
config.setEndpoint(endpoint);
config.setPort(port);
config.setCleanSession(cleanSession);
config.setKeepAliveSecs(keepAliveSecs);
config.setProtocolOperationTimeoutMs(protocolOperationTimeout);
config.setConnectionCallbacks(events);

if (httpProxyOptions != null) {
config.setHttpProxyOptions(httpProxyOptions);
}
if (username != null) {
config.setUsername(username);
}
if (password != null)
{
config.setPassword(password);
}

if (connectionConfigTransformer != null) {
connectionConfigTransformer.accept(config);
}
if (connectionConfigTransformer != null) {
connectionConfigTransformer.accept(config);
}

try {
connection = new MqttClientConnection(config);
if (connectionMessageTransfomer != null) {
connection.onMessage(connectionMessageTransfomer);
try {
connection = new MqttClientConnection(config);
if (connectionMessageTransfomer != null) {
connection.onMessage(connectionMessageTransfomer);
}
CompletableFuture<Boolean> connected = connection.connect();
connected.get();
} finally {
client.close();
}
CompletableFuture<Boolean> connected = connection.connect();
connected.get();
} finally {
client.close();
return true;
}
return true;
}
}

boolean connectWebsocketsWithCredentialsProvider(CredentialsProvider credentialsProvider, String endpoint, int port, TlsContext tlsContext, String username, String password, HttpProxyOptions httpProxyOptions, EventLoopGroup elg, HostResolver hr, ClientBootstrap bootstrap)
boolean connectWebsocketsWithCredentialsProvider(CredentialsProvider credentialsProvider, String endpoint, int port, TlsContext tlsContext, String username, String password, HttpProxyOptions httpProxyOptions)
{
// Return result
boolean result = false;
Expand Down Expand Up @@ -281,71 +286,74 @@ public void onConnectionClosed(OnConnectionClosedReturn data) {
}
};

try (EventLoopGroup elg = new EventLoopGroup(1);
HostResolver hr = new HostResolver(elg);
ClientBootstrap bootstrap = new ClientBootstrap(elg, hr);)
{
try (MqttConnectionConfig config = new MqttConnectionConfig();
AwsSigningConfig signingConfig = new AwsSigningConfig();) {

MqttClient client = null;
if (tlsContext != null)
{
client = new MqttClient(bootstrap, tlsContext);
}
else
{
client = new MqttClient(bootstrap);
}

try (MqttConnectionConfig config = new MqttConnectionConfig();
AwsSigningConfig signingConfig = new AwsSigningConfig();) {

MqttClient client = null;
if (tlsContext != null)
{
client = new MqttClient(bootstrap, tlsContext);
}
else
{
client = new MqttClient(bootstrap);
}

config.setMqttClient(client);
config.setClientId(clientId);
config.setEndpoint(endpoint);
config.setPort(port);
config.setCleanSession(cleanSession);
config.setKeepAliveSecs(keepAliveSecs);
config.setProtocolOperationTimeoutMs(protocolOperationTimeout);
config.setUseWebsockets(true);
config.setConnectionCallbacks(events);

if (username != null) {
config.setUsername(username);
}
if (password != null) {
config.setPassword(password);
}
if (httpProxyOptions != null) {
config.setHttpProxyOptions(httpProxyOptions);
}
config.setMqttClient(client);
config.setClientId(clientId);
config.setEndpoint(endpoint);
config.setPort(port);
config.setCleanSession(cleanSession);
config.setKeepAliveSecs(keepAliveSecs);
config.setProtocolOperationTimeoutMs(protocolOperationTimeout);
config.setUseWebsockets(true);
config.setConnectionCallbacks(events);

if (username != null) {
config.setUsername(username);
}
if (password != null) {
config.setPassword(password);
}
if (httpProxyOptions != null) {
config.setHttpProxyOptions(httpProxyOptions);
}

if (connectionConfigTransformer != null) {
connectionConfigTransformer.accept(config);
}
if (connectionConfigTransformer != null) {
connectionConfigTransformer.accept(config);
}

// Make the websocket transformer
if (credentialsProvider != null) {
signingConfig.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4);
// NOTE: Missing a credentials provider gives a non-helpful error. This needs to be changed in Java V2...
signingConfig.setCredentialsProvider(credentialsProvider);
}
signingConfig.setSignatureType(AwsSigningConfig.AwsSignatureType.HTTP_REQUEST_VIA_QUERY_PARAMS);
signingConfig.setRegion(TEST_REGION);
signingConfig.setService("iotdevicegateway");
signingConfig.setOmitSessionToken(true);
try (MqttClientConnectionSigv4HandshakeTransformer transformer = new MqttClientConnectionSigv4HandshakeTransformer(signingConfig);)
{
config.setWebsocketHandshakeTransform(transformer);
connection = new MqttClientConnection(config);
if (connectionMessageTransfomer != null) {
connection.onMessage(connectionMessageTransfomer);
// Make the websocket transformer
if (credentialsProvider != null) {
signingConfig.setAlgorithm(AwsSigningConfig.AwsSigningAlgorithm.SIGV4);
// NOTE: Missing a credentials provider gives a non-helpful error. This needs to be changed in Java V2...
signingConfig.setCredentialsProvider(credentialsProvider);
}
CompletableFuture<Boolean> connected = connection.connect();
connected.get();
result = true;
}
client.close();
signingConfig.setSignatureType(AwsSigningConfig.AwsSignatureType.HTTP_REQUEST_VIA_QUERY_PARAMS);
signingConfig.setRegion(TEST_REGION);
signingConfig.setService("iotdevicegateway");
signingConfig.setOmitSessionToken(true);
try (MqttClientConnectionSigv4HandshakeTransformer transformer = new MqttClientConnectionSigv4HandshakeTransformer(signingConfig);)
{
config.setWebsocketHandshakeTransform(transformer);
connection = new MqttClientConnection(config);
if (connectionMessageTransfomer != null) {
connection.onMessage(connectionMessageTransfomer);
}
CompletableFuture<Boolean> connected = connection.connect();
connected.get();
result = true;
}
client.close();

} catch (Exception ex) {
fail("Exception during connect: " + ex.toString());
} catch (Exception ex) {
fail("Exception during connect: " + ex.toString());
}
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void ConnWS_Cred_UC2()
try (TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient();
TlsContext tlsContext = new TlsContext(tlsOptions);
CredentialsProvider provider = builder.build();) {
connectWebsocketsWithCredentialsProvider(provider, AWS_TEST_MQTT311_IOT_CORE_HOST, 443, tlsContext, null, null, null, elg, hr, bootstrap);
connectWebsocketsWithCredentialsProvider(provider, AWS_TEST_MQTT311_IOT_CORE_HOST, 443, tlsContext, null, null, null);
disconnect();
close();
}
Expand Down Expand Up @@ -235,7 +235,7 @@ public void ConnWS_Cred_UC3()
try (TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient();
TlsContext tlsContext = new TlsContext(tlsOptions);
CredentialsProvider provider = builder.build();) {
connectWebsocketsWithCredentialsProvider(provider, AWS_TEST_MQTT311_IOT_CORE_HOST, 443, tlsContext, null, null, null, elg, hr, bootstrap);
connectWebsocketsWithCredentialsProvider(provider, AWS_TEST_MQTT311_IOT_CORE_HOST, 443, tlsContext, null, null, null);
disconnect();
close();
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public void ConnWS_Cred_UC4()
try (TlsContextOptions tlsOptions = TlsContextOptions.createDefaultClient();
TlsContext tlsContext = new TlsContext(tlsOptions);
CredentialsProvider provider = builder.build();) {
connectWebsocketsWithCredentialsProvider(provider, AWS_TEST_MQTT311_IOT_CORE_HOST, 443, tlsContext, null, null, null, elg, hr, bootstrap);
connectWebsocketsWithCredentialsProvider(provider, AWS_TEST_MQTT311_IOT_CORE_HOST, 443, tlsContext, null, null, null);
disconnect();
close();
}
Expand Down

0 comments on commit 0de2e90

Please sign in to comment.