diff --git a/.github/trigger_files/beam_PostCommit_Java_Nexmark_Flink.json b/.github/trigger_files/beam_PostCommit_Java_Nexmark_Flink.json new file mode 100644 index 000000000000..531514a72738 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_Nexmark_Flink.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "runFor": "#33146" +} diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json index 9200c368abbe..531514a72738 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Flink.json @@ -1,5 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", - "https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support" + "runFor": "#33146" } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java index 6c0ed7740489..a68ab6c913ce 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java @@ -90,8 +90,8 @@ public void appendTo(Appendable sb) throws IOException { /** {@link StateNamespace} that is scoped to a specific window. */ public static class WindowNamespace implements StateNamespace { - private Coder windowCoder; - private W window; + private final Coder windowCoder; + private final W window; private WindowNamespace(Coder windowCoder, W window) { this.windowCoder = windowCoder; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 205270c22332..fe6d628953d5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -371,6 +371,7 @@ private void collectGlobalWindowStateDescriptor( private static class FlinkValueState implements ValueState { private final StateNamespace namespace; + private final String namespaceKey; private final String stateId; private final ValueStateDescriptor flinkStateDescriptor; private final KeyedStateBackend flinkStateBackend; @@ -383,6 +384,7 @@ private static class FlinkValueState implements ValueState { SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; @@ -394,8 +396,7 @@ private static class FlinkValueState implements ValueState { public void write(T input) { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .update(input); } catch (Exception e) { throw new RuntimeException("Error updating state.", e); @@ -411,8 +412,7 @@ public ValueState readLater() { public T read() { try { return flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .value(); } catch (Exception e) { throw new RuntimeException("Error reading state.", e); @@ -423,8 +423,7 @@ public T read() { public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e); @@ -455,6 +454,7 @@ public int hashCode() { private static class FlinkOrderedListState implements OrderedListState { private final StateNamespace namespace; + private final String namespaceKey; private final ListStateDescriptor> flinkStateDescriptor; private final KeyedStateBackend flinkStateBackend; @@ -465,6 +465,7 @@ private static class FlinkOrderedListState implements OrderedListState { Coder coder, SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.flinkStateBackend = flinkStateBackend; this.flinkStateDescriptor = new ListStateDescriptor<>( @@ -483,7 +484,7 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) { try { ListState> partitionedState = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); partitionedState.update(Lists.newArrayList(sortedMap.values())); } catch (Exception e) { throw new RuntimeException("Error adding to bag state.", e); @@ -500,7 +501,7 @@ public void add(TimestampedValue value) { try { ListState> partitionedState = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); partitionedState.add(value); } catch (Exception e) { throw new RuntimeException("Error adding to bag state.", e); @@ -516,7 +517,7 @@ public Boolean read() { Iterable> result = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .get(); return result == null; } catch (Exception e) { @@ -542,7 +543,7 @@ private SortedMap> readAsMap() { try { ListState> partitionedState = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); listValues = MoreObjects.firstNonNull(partitionedState.get(), Collections.emptyList()); } catch (Exception e) { throw new RuntimeException("Error reading state.", e); @@ -564,8 +565,7 @@ public GroupingState, Iterable>> readLat public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e); @@ -576,6 +576,7 @@ public void clear() { private static class FlinkBagState implements BagState { private final StateNamespace namespace; + private final String namespaceKey; private final String stateId; private final ListStateDescriptor flinkStateDescriptor; private final KeyedStateBackend flinkStateBackend; @@ -589,6 +590,7 @@ private static class FlinkBagState implements BagState { SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; this.storesVoidValues = coder instanceof VoidCoder; @@ -601,7 +603,7 @@ public void add(T input) { try { ListState partitionedState = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); if (storesVoidValues) { Preconditions.checkState(input == null, "Expected to a null value but was: %s", input); // Flink does not allow storing null values @@ -625,7 +627,7 @@ public Iterable read() { try { ListState partitionedState = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); Iterable result = partitionedState.get(); if (storesVoidValues) { return () -> { @@ -662,7 +664,7 @@ public Boolean read() { Iterable result = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .get(); return result == null; } catch (Exception e) { @@ -681,8 +683,7 @@ public ReadableState readLater() { public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e); @@ -715,6 +716,7 @@ private static class FlinkCombiningState implements CombiningState { private final StateNamespace namespace; + private final String namespaceKey; private final String stateId; private final Combine.CombineFn combineFn; private final ValueStateDescriptor flinkStateDescriptor; @@ -729,6 +731,7 @@ private static class FlinkCombiningState SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.stateId = stateId; this.combineFn = combineFn; this.flinkStateBackend = flinkStateBackend; @@ -748,7 +751,7 @@ public void add(InputT value) { try { org.apache.flink.api.common.state.ValueState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); AccumT current = state.value(); if (current == null) { @@ -766,7 +769,7 @@ public void addAccum(AccumT accum) { try { org.apache.flink.api.common.state.ValueState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); AccumT current = state.value(); if (current == null) { @@ -785,8 +788,7 @@ public AccumT getAccum() { try { AccumT accum = flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .value(); return accum != null ? accum : combineFn.createAccumulator(); } catch (Exception e) { @@ -804,7 +806,7 @@ public OutputT read() { try { org.apache.flink.api.common.state.ValueState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); AccumT accum = state.value(); if (accum != null) { @@ -825,7 +827,7 @@ public Boolean read() { try { return flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .value() == null; } catch (Exception e) { @@ -844,8 +846,7 @@ public ReadableState readLater() { public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e); @@ -878,6 +879,7 @@ private static class FlinkCombiningStateWithContext implements CombiningState { private final StateNamespace namespace; + private final String namespaceKey; private final String stateId; private final CombineWithContext.CombineFnWithContext combineFn; private final ValueStateDescriptor flinkStateDescriptor; @@ -894,6 +896,7 @@ private static class FlinkCombiningStateWithContext SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.stateId = stateId; this.combineFn = combineFn; this.flinkStateBackend = flinkStateBackend; @@ -914,7 +917,7 @@ public void add(InputT value) { try { org.apache.flink.api.common.state.ValueState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); AccumT current = state.value(); if (current == null) { @@ -932,7 +935,7 @@ public void addAccum(AccumT accum) { try { org.apache.flink.api.common.state.ValueState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); AccumT current = state.value(); if (current == null) { @@ -951,8 +954,7 @@ public AccumT getAccum() { try { AccumT accum = flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .value(); return accum != null ? accum : combineFn.createAccumulator(context); } catch (Exception e) { @@ -970,7 +972,7 @@ public OutputT read() { try { org.apache.flink.api.common.state.ValueState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); AccumT accum = state.value(); if (accum != null) { @@ -991,7 +993,7 @@ public Boolean read() { try { return flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .value() == null; } catch (Exception e) { @@ -1010,8 +1012,7 @@ public ReadableState readLater() { public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e); @@ -1168,6 +1169,7 @@ public int hashCode() { private static class FlinkMapState implements MapState { private final StateNamespace namespace; + private final String namespaceKey; private final String stateId; private final MapStateDescriptor flinkStateDescriptor; private final KeyedStateBackend flinkStateBackend; @@ -1180,6 +1182,7 @@ private static class FlinkMapState implements MapState mapValueCoder, SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; this.flinkStateDescriptor = @@ -1204,7 +1207,7 @@ public ReadableState get(final KeyT input) { ValueT value = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .get(key); return (value != null) ? value : defaultValue; } catch (Exception e) { @@ -1223,8 +1226,7 @@ public ReadableState get(final KeyT input) { public void put(KeyT key, ValueT value) { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .put(key, value); } catch (Exception e) { throw new RuntimeException("Error put kv to state.", e); @@ -1235,17 +1237,12 @@ public void put(KeyT key, ValueT value) { public ReadableState computeIfAbsent( final KeyT key, Function mappingFunction) { try { - ValueT current = - flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) - .get(key); - + org.apache.flink.api.common.state.MapState state = + flinkStateBackend.getPartitionedState( + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); + ValueT current = state.get(key); if (current == null) { - flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) - .put(key, mappingFunction.apply(key)); + state.put(key, mappingFunction.apply(key)); } return ReadableStates.immediate(current); } catch (Exception e) { @@ -1257,8 +1254,7 @@ public ReadableState computeIfAbsent( public void remove(KeyT key) { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .remove(key); } catch (Exception e) { throw new RuntimeException("Error remove map state key.", e); @@ -1274,7 +1270,7 @@ public Iterable read() { Iterable result = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .keys(); return result != null ? ImmutableList.copyOf(result) : Collections.emptyList(); } catch (Exception e) { @@ -1298,7 +1294,7 @@ public Iterable read() { Iterable result = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .values(); return result != null ? ImmutableList.copyOf(result) : Collections.emptyList(); } catch (Exception e) { @@ -1322,7 +1318,7 @@ public Iterable> read() { Iterable> result = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .entries(); return result != null ? ImmutableList.copyOf(result) : Collections.emptyList(); } catch (Exception e) { @@ -1360,8 +1356,7 @@ public ReadableState>> readLater() { public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e); @@ -1393,6 +1388,7 @@ public int hashCode() { private static class FlinkSetState implements SetState { private final StateNamespace namespace; + private final String namespaceKey; private final String stateId; private final MapStateDescriptor flinkStateDescriptor; private final KeyedStateBackend flinkStateBackend; @@ -1404,6 +1400,7 @@ private static class FlinkSetState implements SetState { Coder coder, SerializablePipelineOptions pipelineOptions) { this.namespace = namespace; + this.namespaceKey = namespace.stringKey(); this.stateId = stateId; this.flinkStateBackend = flinkStateBackend; this.flinkStateDescriptor = @@ -1418,8 +1415,7 @@ public ReadableState contains(final T t) { try { Boolean result = flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .get(t); return ReadableStates.immediate(result != null && result); } catch (Exception e) { @@ -1432,7 +1428,7 @@ public ReadableState addIfAbsent(final T t) { try { org.apache.flink.api.common.state.MapState state = flinkStateBackend.getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor); + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor); boolean alreadyContained = state.contains(t); if (!alreadyContained) { state.put(t, true); @@ -1447,8 +1443,7 @@ public ReadableState addIfAbsent(final T t) { public void remove(T t) { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .remove(t); } catch (Exception e) { throw new RuntimeException("Error remove value to state.", e); @@ -1464,8 +1459,7 @@ public SetState readLater() { public void add(T value) { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .put(value, true); } catch (Exception e) { throw new RuntimeException("Error add value to state.", e); @@ -1481,7 +1475,7 @@ public Boolean read() { Iterable result = flinkStateBackend .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .keys(); return result == null || Iterables.isEmpty(result); } catch (Exception e) { @@ -1501,8 +1495,7 @@ public Iterable read() { try { Iterable result = flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .keys(); return result != null ? ImmutableList.copyOf(result) : Collections.emptyList(); } catch (Exception e) { @@ -1514,8 +1507,7 @@ public Iterable read() { public void clear() { try { flinkStateBackend - .getPartitionedState( - namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor) + .getPartitionedState(namespaceKey, StringSerializer.INSTANCE, flinkStateDescriptor) .clear(); } catch (Exception e) { throw new RuntimeException("Error clearing state.", e);