Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dynamic header capability for http post-processor. #134

Merged
merged 8 commits into from
Apr 25, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.odpf.dagger.core.processors.ColumnNameManager;
import io.odpf.dagger.core.processors.types.SourceConfig;
import io.odpf.dagger.core.utils.Constants.VariableType;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

Expand Down Expand Up @@ -63,19 +64,20 @@ public EndpointHandler(SourceConfig sourceConfig,
}

/**
* Get endpoint or query variables values.
* Get dynamic header variables values.
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
*
* @param rowManager the row manager
* @param variableType the variable type
* @parm variables the variable list
* @param resultFuture the result future
* @return the array object
*/
public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultFuture<Row> resultFuture) {
String queryVariables = sourceConfig.getVariables();
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtils.isEmpty(queryVariables)) {
public Object[] getVariablesValue(RowManager rowManager, VariableType variableType, String variables, ResultFuture<Row> resultFuture) {
if (StringUtils.isEmpty(variables)) {
return new Object[0];
}

String[] requiredInputColumns = queryVariables.split(",");
String[] requiredInputColumns = variables.split(",");
ArrayList<Object> inputColumnValues = new ArrayList<>();
if (descriptorMap == null) {
descriptorMap = createDescriptorMap(requiredInputColumns, inputProtoClasses, resultFuture);
Expand All @@ -84,7 +86,7 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
for (String inputColumnName : requiredInputColumns) {
int inputColumnIndex = columnNameManager.getInputIndex(inputColumnName);
if (inputColumnIndex == -1) {
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the endpoint/query variable", inputColumnName));
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the '%s' variable", inputColumnName, variableType));
}

Descriptors.FieldDescriptor fieldDescriptor = descriptorMap.get(inputColumnName);
Expand All @@ -105,11 +107,12 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
*
* @param resultFuture the result future
* @param rowManager the row manager
* @param endpointVariablesValues the endpoint variables values
* @param variables the request/header variables
* @param variablesValue the variables value
* @return the boolean
*/
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, Object[] endpointVariablesValues) {
if (!StringUtils.isEmpty(sourceConfig.getVariables()) && (Arrays.asList(endpointVariablesValues).isEmpty() || Arrays.stream(endpointVariablesValues).allMatch(""::equals))) {
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, String variables, Object[] variablesValue) {
if (!StringUtils.isEmpty(variables) && (Arrays.asList(variablesValue).isEmpty() || Arrays.stream(variablesValue).allMatch(""::equals))) {
LOGGER.warn("Could not populate any request variable. Skipping external calls");
meterStatsManager.markEvent(ExternalSourceAspects.EMPTY_INPUT);
resultFuture.complete(singleton(rowManager.getAll()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.odpf.dagger.core.processors.external.AsyncConnector;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.VariableType;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand Down Expand Up @@ -79,8 +79,8 @@ protected void createClient() {
protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);
Object[] endpointVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, endpointVariablesValues)) {
.getVariablesValue(rowManager, VariableType.ENDPOINT_VARIABLE, esSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, esSourceConfig.getVariables(), endpointVariablesValues)) {
return;
}
String esEndpoint = String.format(esSourceConfig.getPattern(), endpointVariablesValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.VariableType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,8 +91,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) throws Excepti
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, VariableType.REQUEST_VARIABLES, grpcSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, grpcSourceConfig.getVariables(), requestVariablesValues)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.flink.types.Row;

import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.VariableType;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,12 +95,14 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, VariableType.REQUEST_VARIABLES, httpSourceConfig.getRequestVariables(), resultFuture);
Object[] dynamicHeaderVariablesValues = getEndpointHandler()
.getVariablesValue(rowManager, VariableType.HEADER_VARIABLES, httpSourceConfig.getHeaderVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
return;
}

BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues);
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
httpResponseHandler.startTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
private String verb;
private String requestPattern;
private String requestVariables;
private String headerPattern;
private String headerVariables;
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
Expand All @@ -40,21 +42,25 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
* @param verb the verb
* @param requestPattern the request pattern
* @param requestVariables the request variables
* @param headerPattern the dynamic header pattern
* @param headerVariables the header variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param type the type
* @param capacity the capacity
* @param headers the headers
* @param headers the static headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param retainResponseType the retain response type
*/
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
this.endpoint = endpoint;
this.verb = verb;
this.requestPattern = requestPattern;
this.requestVariables = requestVariables;
this.headerPattern = headerPattern;
this.headerVariables = headerVariables;
this.streamTimeout = streamTimeout;
this.connectTimeout = connectTimeout;
this.failOnErrors = failOnErrors;
Expand Down Expand Up @@ -102,6 +108,24 @@ public String getRequestVariables() {
return requestVariables;
}

/**
* Gets header pattern.
*
* @return the header pattern
*/
public String getHeaderPattern() {
return headerPattern;
}

/**
* Gets header Variable.
*
* @return the header Variable
*/
public String getHeaderVariables() {
return headerVariables;
}

@Override
public String getPattern() {
return requestPattern;
Expand Down Expand Up @@ -201,18 +225,19 @@ public boolean isRetainResponseType() {

@Override
public boolean equals(Object o) {
System.out.println("testing");
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
HttpSourceConfig that = (HttpSourceConfig) o;
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
return Objects.hash(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package io.odpf.dagger.core.processors.external.http.request;

import com.google.gson.Gson;
import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;

import java.util.HashMap;
import java.util.Map;

/**
* The Http get request handler.
*/
public class HttpGetRequestHandler implements HttpRequestHandler {
private HttpSourceConfig httpSourceConfig;
private AsyncHttpClient httpClient;
private Object[] requestVariablesValues;
private Object[] dynamicHeaderVariablesValues;

/**
* Instantiates a new Http get request handler.
Expand All @@ -19,10 +25,11 @@ public class HttpGetRequestHandler implements HttpRequestHandler {
* @param httpClient the http client
* @param requestVariablesValues the request variables values
*/
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
this.httpSourceConfig = httpSourceConfig;
this.httpClient = httpClient;
this.requestVariablesValues = requestVariablesValues;
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
}

@Override
Expand All @@ -31,7 +38,12 @@ public BoundRequestBuilder create() {
String endpoint = httpSourceConfig.getEndpoint();
String requestEndpoint = endpoint + endpointPath;
BoundRequestBuilder getRequest = httpClient.prepareGet(requestEndpoint);
return addHeaders(getRequest, httpSourceConfig.getHeaders());
Map<String, String> headers = httpSourceConfig.getHeaders();
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class));
}
return addHeaders(getRequest, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
package io.odpf.dagger.core.processors.external.http.request;

import com.google.gson.Gson;
import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;

import java.util.HashMap;
import java.util.Map;

/**
* The Http post request handler.
*/
public class HttpPostRequestHandler implements HttpRequestHandler {
private HttpSourceConfig httpSourceConfig;
private AsyncHttpClient httpClient;
private Object[] requestVariablesValues;

private Object[] dynamicHeaderVariablesValues;
/**
* Instantiates a new Http post request handler.
*
* @param httpSourceConfig the http source config
* @param httpClient the http client
* @param requestVariablesValues the request variables values
*/
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
this.httpSourceConfig = httpSourceConfig;
this.httpClient = httpClient;
this.requestVariablesValues = requestVariablesValues;
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
}

@Override
Expand All @@ -32,7 +38,12 @@ public BoundRequestBuilder create() {
BoundRequestBuilder postRequest = httpClient
.preparePost(endpoint)
.setBody(requestBody);
return addHeaders(postRequest, httpSourceConfig.getHeaders());
Map<String, String> headers = httpSourceConfig.getHeaders();
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class));
}
return addHeaders(postRequest, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class HttpRequestFactory {
* @param requestVariablesValues the request variables values
* @return the bound request builder
*/
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] headerVariablesValues) {

ArrayList<HttpRequestHandler> httpRequestHandlers = new ArrayList<>();
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues));
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues));
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));

HttpRequestHandler httpRequestHandler = httpRequestHandlers
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.odpf.dagger.core.processors.external.AsyncConnector;
import io.odpf.dagger.core.metrics.aspects.ExternalSourceAspects;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.VariableType;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.pgclient.PgConnectOptions;
Expand Down Expand Up @@ -89,8 +90,8 @@ public void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);

Object[] queryVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, queryVariablesValues)) {
.getVariablesValue(rowManager, VariableType.QUERY_VARIABLES, pgSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, pgSourceConfig.getVariables(), queryVariablesValues)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,5 @@ public class Constants {

public static final long MAX_EVENT_LOOP_EXECUTE_TIME_DEFAULT = 10000;
public static final int LONGBOW_OUTPUT_ADDITIONAL_ARITY = 3;
public enum VariableType { REQUEST_VARIABLES, HEADER_VARIABLES, QUERY_VARIABLES, ENDPOINT_VARIABLE };
prakharmathur82 marked this conversation as resolved.
Show resolved Hide resolved
}
Loading