Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic header capability for http post-processor. #134

Merged
merged 8 commits into from
Apr 25, 2022
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.odpf.dagger.core.processors.common;

import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.ColumnNameManager;
import io.odpf.dagger.core.processors.types.SourceConfig;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
Expand Down Expand Up @@ -63,19 +64,19 @@ public EndpointHandler(SourceConfig sourceConfig,
}

/**
* Get endpoint or query variables values.
* Get dynamic header variables values.
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param rowManager the row manager
* @parm variables the variable list
* @param resultFuture the result future
* @return the array object
*/
public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultFuture<Row> resultFuture) {
String queryVariables = sourceConfig.getVariables();
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtils.isEmpty(queryVariables)) {
public Object[] getVariablesValue(RowManager rowManager, String variables, ResultFuture<Row> resultFuture) {
if (StringUtils.isEmpty(variables)) {
return new Object[0];
}

String[] requiredInputColumns = queryVariables.split(",");
String[] requiredInputColumns = variables.split(",");
ArrayList<Object> inputColumnValues = new ArrayList<>();
if (descriptorMap == null) {
descriptorMap = createDescriptorMap(requiredInputColumns, inputProtoClasses, resultFuture);
Expand Down Expand Up @@ -105,11 +106,12 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
*
* @param resultFuture the result future
* @param rowManager the row manager
* @param endpointVariablesValues the endpoint variables values
* @param variables the request/header variables
* @param variablesValue the variables value
* @return the boolean
*/
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, Object[] endpointVariablesValues) {
if (!StringUtils.isEmpty(sourceConfig.getVariables()) && (Arrays.asList(endpointVariablesValues).isEmpty() || Arrays.stream(endpointVariablesValues).allMatch(""::equals))) {
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, String variables, Object[] variablesValue) {
if (!StringUtils.isEmpty(variables) && (Arrays.asList(variablesValue).isEmpty() || Arrays.stream(variablesValue).allMatch(""::equals))) {
LOGGER.warn("Could not populate any request variable. Skipping external calls");
meterStatsManager.markEvent(ExternalSourceAspects.EMPTY_INPUT);
resultFuture.complete(singleton(rowManager.getAll()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ protected void createClient() {
protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);
Object[] endpointVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, endpointVariablesValues)) {
.getVariablesValue(rowManager, esSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, esSourceConfig.getVariables(), endpointVariablesValues)) {
return;
}
String esEndpoint = String.format(esSourceConfig.getPattern(), endpointVariablesValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) throws Excepti
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, grpcSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, grpcSourceConfig.getVariables(), requestVariablesValues)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, httpSourceConfig.getRequestVariables(), resultFuture);
Object[] dynamicHeaderVariablesValues = getEndpointHandler()
.getVariablesValue(rowManager, httpSourceConfig.getHeaderVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
return;
}

BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues);
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
httpResponseHandler.startTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
private String verb;
private String requestPattern;
private String requestVariables;
private String headerPattern;
private String headerVariables;
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
Expand All @@ -40,21 +42,25 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
* @param verb the verb
* @param requestPattern the request pattern
* @param requestVariables the request variables
* @param headerPattern the dynamic header pattern
* @param headerVariables the header variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param type the type
* @param capacity the capacity
* @param headers the headers
* @param headers the static headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param retainResponseType the retain response type
*/
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
this.endpoint = endpoint;
this.verb = verb;
this.requestPattern = requestPattern;
this.requestVariables = requestVariables;
this.headerPattern = headerPattern;
this.headerVariables = headerVariables;
this.streamTimeout = streamTimeout;
this.connectTimeout = connectTimeout;
this.failOnErrors = failOnErrors;
Expand Down Expand Up @@ -102,6 +108,24 @@ public String getRequestVariables() {
return requestVariables;
}

/**
* Gets header pattern.
*
* @return the header pattern
*/
public String getHeaderPattern() {
return headerPattern;
}

/**
* Gets header Variable.
*
* @return the header Variable
*/
public String getHeaderVariables() {
return headerVariables;
}

@Override
public String getPattern() {
return requestPattern;
Expand Down Expand Up @@ -201,18 +225,19 @@ public boolean isRetainResponseType() {

@Override
public boolean equals(Object o) {
System.out.println("testing");
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HttpSourceConfig that = (HttpSourceConfig) o;
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
return Objects.hash(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package io.odpf.dagger.core.processors.external.http.request;

import com.google.gson.Gson;
import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;

import java.util.HashMap;

/**
* The Http get request handler.
*/
public class HttpGetRequestHandler implements HttpRequestHandler {
private HttpSourceConfig httpSourceConfig;
private AsyncHttpClient httpClient;
private Object[] requestVariablesValues;
private Object[] dynamicHeaderVariablesValues;

/**
* Instantiates a new Http get request handler.
Expand All @@ -19,10 +24,11 @@ public class HttpGetRequestHandler implements HttpRequestHandler {
* @param httpClient the http client
* @param requestVariablesValues the request variables values
*/
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
this.httpSourceConfig = httpSourceConfig;
this.httpClient = httpClient;
this.requestVariablesValues = requestVariablesValues;
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
}

@Override
Expand All @@ -31,6 +37,11 @@ public BoundRequestBuilder create() {
String endpoint = httpSourceConfig.getEndpoint();
String requestEndpoint = endpoint + endpointPath;
BoundRequestBuilder getRequest = httpClient.prepareGet(requestEndpoint);
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
HashMap<String, String> dynamicHeaderMap = new Gson().fromJson(dynamicHeader, HashMap.class);
getRequest = addHeaders(getRequest, dynamicHeaderMap);
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
}
return addHeaders(getRequest, httpSourceConfig.getHeaders());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
package io.odpf.dagger.core.processors.external.http.request;

import com.google.gson.Gson;
import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;

import java.util.HashMap;

/**
* The Http post request handler.
*/
public class HttpPostRequestHandler implements HttpRequestHandler {
private HttpSourceConfig httpSourceConfig;
private AsyncHttpClient httpClient;
private Object[] requestVariablesValues;

private Object[] dynamicHeaderVariablesValues;
/**
* Instantiates a new Http post request handler.
*
* @param httpSourceConfig the http source config
* @param httpClient the http client
* @param requestVariablesValues the request variables values
*/
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
this.httpSourceConfig = httpSourceConfig;
this.httpClient = httpClient;
this.requestVariablesValues = requestVariablesValues;
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
}

@Override
Expand All @@ -32,6 +37,11 @@ public BoundRequestBuilder create() {
BoundRequestBuilder postRequest = httpClient
.preparePost(endpoint)
.setBody(requestBody);
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
HashMap<String, String> dynamicHeaderMap = new Gson().fromJson(dynamicHeader, HashMap.class);
postRequest = addHeaders(postRequest, dynamicHeaderMap);
}
return addHeaders(postRequest, httpSourceConfig.getHeaders());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class HttpRequestFactory {
* @param requestVariablesValues the request variables values
* @return the bound request builder
*/
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] headerVariablesValues) {

ArrayList<HttpRequestHandler> httpRequestHandlers = new ArrayList<>();
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues));
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues));
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));

HttpRequestHandler httpRequestHandler = httpRequestHandlers
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);

Object[] queryVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, queryVariablesValues)) {
.getVariablesValue(rowManager, pgSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, pgSourceConfig.getVariables(), queryVariablesValues)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shouldReturnHttpExternalSourceConfig() {
outputMapping = new OutputMapping("$.data.tensor.values[0]");
outputMappings.put("surge_factor", outputMapping);

HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "post", null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false);
HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false);

assertEquals(httpSourceConfig, defaultPostProcessorConfig.getExternalSource().getHttpConfig().get(0));
}
Expand Down Expand Up @@ -120,7 +120,7 @@ public void shouldBeEmptyWhenNoneOfTheConfigsExist() {
@Test
public void shouldNotBeEmptyWhenExternalSourceHasHttpConfigExist() {
ArrayList<HttpSourceConfig> http = new ArrayList<>();
http.add(new HttpSourceConfig("", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false));
ArrayList<EsSourceConfig> es = new ArrayList<>();
ArrayList<PgSourceConfig> pg = new ArrayList<>();
ExternalSourceConfig externalSourceConfig = new ExternalSourceConfig(http, es, pg, new ArrayList<>());
Expand Down
Loading