Skip to content

Commit

Permalink
feat: Capability in dagger to consume from ACL enabled kafka clusters (
Browse files Browse the repository at this point in the history
…#195)

* feat: added source kafka props to consume data from ACL enabled kafka

* fix: revert version bump

* fix: added SASL_JAAS_CONFIG config key

* feat: docs update for JAAS config

* revert: local.properties changes

* fix: added security page

* fix: added security page

* fix: security page link update

* feat: added basic validation for supported kafka acl configuration

* fix: added missing security page in sidebar

Co-authored-by: Mayank Rai <[email protected]>
  • Loading branch information
mayankrai09 and mayankrai09 authored Oct 17, 2022
1 parent 3243b39 commit 8f32ecc
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.odpf.dagger.core.source.config;

import com.google.gson.annotations.JsonAdapter;
import io.odpf.dagger.core.source.config.adapter.DaggerSASLMechanismAdaptor;
import io.odpf.dagger.core.source.config.adapter.DaggerSecurityProtocolAdaptor;
import io.odpf.dagger.core.source.config.adapter.FileDateRangeAdaptor;
import io.odpf.dagger.core.source.config.adapter.SourceParquetFilePathsAdapter;
import io.odpf.dagger.core.source.config.models.SourceDetails;
Expand Down Expand Up @@ -66,6 +68,20 @@ public class StreamConfig {
@Getter
private String bootstrapServers;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY)
@Getter
@JsonAdapter(value = DaggerSecurityProtocolAdaptor.class)
private String securityProtocol;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY)
@JsonAdapter(value = DaggerSASLMechanismAdaptor.class)
@Getter
private String saslMechanism;

@SerializedName(SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY)
@Getter
private String saslJaasConfig;

@SerializedName(STREAM_INPUT_STREAM_NAME_KEY)
@Getter
private String kafkaName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import io.odpf.dagger.core.utils.Constants;

import java.io.IOException;
import java.util.Arrays;

public class DaggerSASLMechanismAdaptor extends TypeAdapter<String> {
@Override
public void write(JsonWriter jsonWriter, String value) throws IOException {
if (value == null) {
jsonWriter.nullValue();
return;
}
jsonWriter.value(value);
}

@Override
public String read(JsonReader jsonReader) throws IOException {
String saslMechanism = jsonReader.nextString();
if (Arrays.stream(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM).anyMatch(saslMechanism::equals)) {
return saslMechanism;
} else {
throw new InvalidConfigurationException(String.format("Configured wrong SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM supported values are %s", Arrays.toString(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import io.odpf.dagger.core.utils.Constants;

import java.io.IOException;
import java.util.Arrays;

public class DaggerSecurityProtocolAdaptor extends TypeAdapter<String> {
@Override
public void write(JsonWriter jsonWriter, String value) throws IOException {
if (value == null) {
jsonWriter.nullValue();
return;
}
jsonWriter.value(value);
}

@Override
public String read(JsonReader jsonReader) throws IOException {
String securityProtocol = jsonReader.nextString();
if (Arrays.stream(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL).anyMatch(securityProtocol::equals)) {
return securityProtocol;
} else {
throw new InvalidConfigurationException(String.format("Configured wrong SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL supported values are %s", Arrays.toString(Constants.SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public class Constants {
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS";

public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM";
public static final String SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG_KEY = "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG";

public static final String METRIC_TELEMETRY_ENABLE_KEY = "METRIC_TELEMETRY_ENABLE";
public static final boolean METRIC_TELEMETRY_ENABLE_VALUE_DEFAULT = true;
Expand Down Expand Up @@ -172,4 +174,7 @@ public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIAB
// Comma seperated error types
public static final String SINK_ERROR_TYPES_FOR_FAILURE = "SINK_ERROR_TYPES_FOR_FAILURE";
public static final String SINK_ERROR_TYPES_FOR_FAILURE_DEFAULT = "";

public static final String[] SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL = {"SASL_PLAINTEXT", "SASL_SSL"};
public static final String[] SUPPORTED_SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM = {"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"};
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,58 @@ public void shouldParseKafkaProperties() {
assertEquals(properties, streamConfigs[0].getKafkaProps(configuration));
}

@Test
public void shouldParseKafkaPropertiesWithSASLConfig() {
when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]");
when(configuration.getBoolean(SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY, SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT)).thenReturn(false);
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);

HashMap<String, String> kafkaPropMap = new HashMap<>();
kafkaPropMap.put("group.id", "dummy-consumer-group");
kafkaPropMap.put("bootstrap.servers", "localhost:9092");
kafkaPropMap.put("auto.offset.reset", "latest");
kafkaPropMap.put("auto.commit.enable", "");
kafkaPropMap.put("sasl.mechanism", "SCRAM-SHA-512");
kafkaPropMap.put("security.protocol", "SASL_PLAINTEXT");
kafkaPropMap.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

Properties properties = new Properties();
properties.putAll(kafkaPropMap);

assertEquals(properties, streamConfigs[0].getKafkaProps(configuration));
}

@Test
public void shouldParseMultipleStreamsFromStreamConfigWithSASLConfig() {
when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL\": \"SASL_PLAINTEXT\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM\":\"SCRAM-SHA-512\",\"SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG\":\"org.apache.kafka.common.security.scram.ScramLoginModule required username=\\\"username\\\" password=\\\"password\\\";\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"false\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" }, {\"INPUT_SCHEMA_TABLE\": \"data_stream_1\", \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_DATATYPE\": \"JSON\", \"INPUT_SCHEMA_JSON_SCHEMA\": \"{ \\\"$schema\\\": \\\"https://json-schema.org/draft/2020-12/schema\\\", \\\"$id\\\": \\\"https://example.com/product.schema.json\\\", \\\"title\\\": \\\"Product\\\", \\\"description\\\": \\\"A product from Acme's catalog\\\", \\\"type\\\": \\\"object\\\", \\\"properties\\\": { \\\"id\\\": { \\\"description\\\": \\\"The unique identifier for a product\\\", \\\"type\\\": \\\"string\\\" }, \\\"time\\\": { \\\"description\\\": \\\"event timestamp of the event\\\", \\\"type\\\": \\\"string\\\", \\\"format\\\" : \\\"date-time\\\" } }, \\\"required\\\": [ \\\"id\\\", \\\"time\\\" ] }\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]");
StreamConfig[] streamConfigs = StreamConfig.parse(configuration);

assertEquals(2, streamConfigs.length);

StreamConfig currConfig = streamConfigs[0];
assertEquals("false", currConfig.getAutoCommitEnable());
assertEquals("latest", currConfig.getAutoOffsetReset());
assertEquals("PROTO", currConfig.getDataType());
assertEquals("dummy-consumer-group", currConfig.getConsumerGroupId());
assertEquals("41", currConfig.getEventTimestampFieldIndex());
assertEquals("test-topic", currConfig.getKafkaTopicNames());
assertEquals("data_stream", currConfig.getSchemaTable());
assertEquals("local-kafka-stream", currConfig.getKafkaName());
assertEquals("SCRAM-SHA-512", currConfig.getSaslMechanism());
assertEquals("SASL_PLAINTEXT", currConfig.getSecurityProtocol());
assertEquals("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";", currConfig.getSaslJaasConfig());

StreamConfig currConfigNext = streamConfigs[1];
assertEquals("true", currConfigNext.getAutoCommitEnable());
assertEquals("latest", currConfigNext.getAutoOffsetReset());
assertEquals("JSON", currConfigNext.getDataType());
assertEquals("dummy-consumer-group", currConfigNext.getConsumerGroupId());
assertEquals("41", currConfigNext.getEventTimestampFieldIndex());
assertEquals("test-topic", currConfigNext.getKafkaTopicNames());
assertEquals("data_stream_1", currConfigNext.getSchemaTable());
assertEquals("local-kafka-stream", currConfigNext.getKafkaName());
}

@Test
public void shouldAddAdditionalKafkaConfigToKafkaProperties() {
when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.stream.JsonReader;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class DaggerSASLMechanismAdaptorTest {
@Mock
private JsonReader jsonReader;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SCRAM-SHA-512");
DaggerSASLMechanismAdaptor daggerSASLMechanismAdaptor = new DaggerSASLMechanismAdaptor();
String saslMechanism = daggerSASLMechanismAdaptor.read(jsonReader);
assertEquals("SCRAM-SHA-512", saslMechanism);
}

@Test
public void shouldNotAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SCRAMSHA512");
DaggerSASLMechanismAdaptor daggerSASLMechanismAdaptor = new DaggerSASLMechanismAdaptor();
assertThrows(InvalidConfigurationException.class, () -> daggerSASLMechanismAdaptor.read(jsonReader));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.odpf.dagger.core.source.config.adapter;

import com.google.gson.stream.JsonReader;
import io.odpf.dagger.core.exception.InvalidConfigurationException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

public class DaggerSecurityProtocolAdaptorTest {

@Mock
private JsonReader jsonReader;

@Before
public void setup() {
initMocks(this);
}

@Test
public void shouldAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SASL_PLAINTEXT");
DaggerSecurityProtocolAdaptor daggerSecurityProtocolAdaptor = new DaggerSecurityProtocolAdaptor();
String securityProtocol = daggerSecurityProtocolAdaptor.read(jsonReader);
assertEquals("SASL_PLAINTEXT", securityProtocol);
}

@Test
public void shouldNotAcceptConfiguredValue() throws IOException {
when(jsonReader.nextString()).thenReturn("SASLPLAINTEXT");
DaggerSecurityProtocolAdaptor daggerSecurityProtocolAdaptor = new DaggerSecurityProtocolAdaptor();
assertThrows(InvalidConfigurationException.class, () -> daggerSecurityProtocolAdaptor.read(jsonReader));
}
}
4 changes: 4 additions & 0 deletions docs/docs/advance/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ Longbow+ is an enhanced version of longbow. It has additional support for comple
### [DARTS](./DARTS.md)

DARTS allows you to join streaming data from a reference data store. It supports reference data store in the form of a list or <key, value> map. It enables the refer-table with the help of UDFs which can be used in the SQL query. Currently we only support GCS as reference data source.

### [Security](./security.md)

Enable secure data access from ACL enabled kafka source using SASL (Simple Authentication Security Layer) authentication.
Loading

0 comments on commit 8f32ecc

Please sign in to comment.