Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2233: Allow overriding descriptor generated task.inputs using configs #1065

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,8 @@ ExecutionPlan getExecutionPlan(String runId) {
// TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
// Don't generate any configurations for LegacyTaskApplications
if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) {
if (userConfig.containsKey(TaskConfig.INPUT_STREAMS)) {
LOG.warn("SamzaApplications should not specify task.inputs in configuration. " +
"Specify them using InputDescriptors instead. Ignoring configured task.inputs value of " +
userConfig.get(TaskConfig.INPUT_STREAMS));
// Don't allow overriding task.inputs to a blank string
if (StringUtils.isBlank(userConfig.get(TaskConfig.INPUT_STREAMS))) {
allowedUserConfig.remove(TaskConfig.INPUT_STREAMS);
}
generatedConfig.putAll(getGeneratedConfig(runId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,15 @@ private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDesc
Map<Integer, Iterable<StreamMessageType>> partitionData) {
String systemName = descriptor.getSystemName();
String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
if (configs.containsKey(TaskConfig.INPUT_STREAMS)) {
configs.put(TaskConfig.INPUT_STREAMS,
configs.get(TaskConfig.INPUT_STREAMS).concat("," + systemName + "." + streamName));
} else {
configs.put(TaskConfig.INPUT_STREAMS, systemName + "." + streamName);
if (this.app instanceof LegacyTaskApplication) {
// task.inputs is generated using descriptors for Task/StreamApplication, but needs to be generated here
// for legacy applications that only specify task.class.
if (configs.containsKey(TaskConfig.INPUT_STREAMS)) {
configs.put(TaskConfig.INPUT_STREAMS,
configs.get(TaskConfig.INPUT_STREAMS).concat("," + systemName + "." + streamName));
} else {
configs.put(TaskConfig.INPUT_STREAMS, systemName + "." + streamName);
}
}
InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor();
imsd.withInMemoryScope(this.inMemoryScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.samza.test.processor;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -140,7 +139,7 @@ public void setUp() {
// Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system
// TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+
Map<String, String> configMap =
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId);
buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
Expand Down Expand Up @@ -189,14 +188,10 @@ private void publishKafkaEvents(String topic, int startIndex, int endIndex, Stri
}
}

private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId, boolean isBatch) {
List<String> inputSystemStreams = inputTopics.stream()
.map(topic -> String.format("%s.%s", TestZkLocalApplicationRunner.TEST_SYSTEM, topic))
.collect(Collectors.toList());
private Map<String, String> buildStreamApplicationConfigMap(String appName, String appId, boolean isBatch) {
String coordinatorSystemName = "coordinatorSystem";
Map<String, String> config = new HashMap<>();
config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
config.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputSystemStreams));
config.put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM);
config.put(TaskConfig.IGNORED_EXCEPTIONS, "*");
config.put(ZkConfig.ZK_CONNECT, zkConnect());
Expand Down Expand Up @@ -227,8 +222,8 @@ private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTo
return applicationConfig;
}

private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) {
return buildStreamApplicationConfigMap(inputTopics, appName, appId, false);
private Map<String, String> buildStreamApplicationConfigMap(String appName, String appId) {
return buildStreamApplicationConfigMap(appName, appId, false);
}

/**
Expand Down Expand Up @@ -533,8 +528,7 @@ public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() t
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

Map<String, String> configMap = buildStreamApplicationConfigMap(
ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId);
Map<String, String> configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId);

configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
Config applicationConfig1 = new MapConfig(configMap);
Expand Down Expand Up @@ -607,7 +601,7 @@ public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() t
public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContainerShutdownTime() throws Exception {
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

Map<String, String> configMap = buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId);
Map<String, String> configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId);
configMap.put(TaskConfig.TASK_SHUTDOWN_MS, "0");

configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
Expand Down Expand Up @@ -756,7 +750,7 @@ public void testStatefulSamzaApplicationShouldRedistributeInputPartitionsToCorre
createTopic(statefulInputKafkaTopic, 32, 1);

// Generate configuration for the test.
Map<String, String> configMap = buildStreamApplicationConfigMap(ImmutableList.of(statefulInputKafkaTopic), testStreamAppName, testStreamAppId);
Map<String, String> configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
Config applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));

Expand Down Expand Up @@ -856,8 +850,7 @@ public void testStatefulSamzaApplicationShouldRedistributeInputPartitionsToCorre
createTopic(statefulInputKafkaTopic2, 32, 1);

// Generate configuration for the test.
Map<String, String> configMap = buildStreamApplicationConfigMap(ImmutableList.of(statefulInputKafkaTopic1, statefulInputKafkaTopic2),
testStreamAppName, testStreamAppId);
Map<String, String> configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
Config applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));

Expand Down Expand Up @@ -1002,7 +995,7 @@ public void testAgreeingOnSameRunIdForBatch() throws InterruptedException {
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

Map<String, String> configMap =
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
Expand Down Expand Up @@ -1054,7 +1047,7 @@ public void testNewProcessorGetsSameRunIdForBatch() throws InterruptedException
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

Map<String, String> configMap =
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
Expand Down Expand Up @@ -1124,7 +1117,7 @@ public void testAllProcesssorDieNewProcessorGetsNewRunIdForBatch() throws Interr
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

Map<String, String> configMap =
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
Expand Down Expand Up @@ -1198,7 +1191,7 @@ public void testFirstProcessorDiesButSameRunIdForBatch() throws InterruptedExcep
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);

Map<String, String> configMap =
buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true);
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
Expand Down