diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java index 577964e9f1..298a9b5c61 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java @@ -78,10 +78,8 @@ ExecutionPlan getExecutionPlan(String runId) { // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811 // Don't generate any configurations for LegacyTaskApplications if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) { - if (userConfig.containsKey(TaskConfig.INPUT_STREAMS)) { - LOG.warn("SamzaApplications should not specify task.inputs in configuration. " + - "Specify them using InputDescriptors instead. Ignoring configured task.inputs value of " + - userConfig.get(TaskConfig.INPUT_STREAMS)); + // Don't allow overriding task.inputs to a blank string + if (StringUtils.isBlank(userConfig.get(TaskConfig.INPUT_STREAMS))) { allowedUserConfig.remove(TaskConfig.INPUT_STREAMS); } generatedConfig.putAll(getGeneratedConfig(runId)); diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 2f2e74e22e..42ebf3226c 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -373,11 +373,15 @@ private void initializeInMemoryInputStream(InMemoryInputDesc Map> partitionData) { String systemName = descriptor.getSystemName(); String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId()); - if (configs.containsKey(TaskConfig.INPUT_STREAMS)) { - configs.put(TaskConfig.INPUT_STREAMS, - configs.get(TaskConfig.INPUT_STREAMS).concat("," + systemName + "." + streamName)); - } else { - configs.put(TaskConfig.INPUT_STREAMS, systemName + "." + streamName); + if (this.app instanceof LegacyTaskApplication) { + // task.inputs is generated using descriptors for Task/StreamApplication, but needs to be generated here + // for legacy applications that only specify task.class. + if (configs.containsKey(TaskConfig.INPUT_STREAMS)) { + configs.put(TaskConfig.INPUT_STREAMS, + configs.get(TaskConfig.INPUT_STREAMS).concat("," + systemName + "." + streamName)); + } else { + configs.put(TaskConfig.INPUT_STREAMS, systemName + "." + streamName); + } } InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); imsd.withInMemoryScope(this.inMemoryScope); diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 7ad0632c4e..d234590f64 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -19,7 +19,6 @@ package org.apache.samza.test.processor; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -140,7 +139,7 @@ public void setUp() { // Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system // TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+ Map configMap = - buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId); + buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); @@ -189,14 +188,10 @@ private void publishKafkaEvents(String topic, int startIndex, int endIndex, Stri } } - private Map buildStreamApplicationConfigMap(List inputTopics, String appName, String appId, boolean isBatch) { - List inputSystemStreams = inputTopics.stream() - .map(topic -> String.format("%s.%s", TestZkLocalApplicationRunner.TEST_SYSTEM, topic)) - .collect(Collectors.toList()); + private Map buildStreamApplicationConfigMap(String appName, String appId, boolean isBatch) { String coordinatorSystemName = "coordinatorSystem"; Map config = new HashMap<>(); config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS); - config.put(TaskConfig.INPUT_STREAMS, Joiner.on(',').join(inputSystemStreams)); config.put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM); config.put(TaskConfig.IGNORED_EXCEPTIONS, "*"); config.put(ZkConfig.ZK_CONNECT, zkConnect()); @@ -227,8 +222,8 @@ private Map buildStreamApplicationConfigMap(List inputTo return applicationConfig; } - private Map buildStreamApplicationConfigMap(List inputTopics, String appName, String appId) { - return buildStreamApplicationConfigMap(inputTopics, appName, appId, false); + private Map buildStreamApplicationConfigMap(String appName, String appId) { + return buildStreamApplicationConfigMap(appName, appId, false); } /** @@ -533,8 +528,7 @@ public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() t // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - Map configMap = buildStreamApplicationConfigMap( - ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId); + Map configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); Config applicationConfig1 = new MapConfig(configMap); @@ -607,7 +601,7 @@ public void testRollingUpgradeOfStreamApplicationsShouldGenerateSameJobModel() t public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContainerShutdownTime() throws Exception { publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - Map configMap = buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId); + Map configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId); configMap.put(TaskConfig.TASK_SHUTDOWN_MS, "0"); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); @@ -756,7 +750,7 @@ public void testStatefulSamzaApplicationShouldRedistributeInputPartitionsToCorre createTopic(statefulInputKafkaTopic, 32, 1); // Generate configuration for the test. - Map configMap = buildStreamApplicationConfigMap(ImmutableList.of(statefulInputKafkaTopic), testStreamAppName, testStreamAppId); + Map configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); Config applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); @@ -856,8 +850,7 @@ public void testStatefulSamzaApplicationShouldRedistributeInputPartitionsToCorre createTopic(statefulInputKafkaTopic2, 32, 1); // Generate configuration for the test. - Map configMap = buildStreamApplicationConfigMap(ImmutableList.of(statefulInputKafkaTopic1, statefulInputKafkaTopic2), - testStreamAppName, testStreamAppId); + Map configMap = buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); Config applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); @@ -1002,7 +995,7 @@ public void testAgreeingOnSameRunIdForBatch() throws InterruptedException { publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); Map configMap = - buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); @@ -1054,7 +1047,7 @@ public void testNewProcessorGetsSameRunIdForBatch() throws InterruptedException publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); Map configMap = - buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); @@ -1124,7 +1117,7 @@ public void testAllProcesssorDieNewProcessorGetsNewRunIdForBatch() throws Interr publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); Map configMap = - buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); @@ -1198,7 +1191,7 @@ public void testFirstProcessorDiesButSameRunIdForBatch() throws InterruptedExcep publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); Map configMap = - buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + buildStreamApplicationConfigMap(testStreamAppName, testStreamAppId, true); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);