-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MultimapState API #23491
Add MultimapState API #23491
Conversation
Codecov Report
@@ Coverage Diff @@
## master #23491 +/- ##
==========================================
- Coverage 73.33% 72.62% -0.72%
==========================================
Files 719 736 +17
Lines 95792 97267 +1475
==========================================
+ Hits 70250 70638 +388
- Misses 24231 25318 +1087
Partials 1311 1311
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
6618e8a
to
7dcb816
Compare
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
7dcb816
to
306db0f
Compare
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void clear() { | ||
contents = ArrayListMultimap.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just clear the instantiated contents?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See
beam/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
Line 411 in af6a04c
// Even though we're clearing we can't remove this from the in-memory state map, since |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should create a new container instead of clear()
, this is also how Bag/Map/Set do clear()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The +1 was for your response that we need to keep stuff consistent for past reads that were returned.
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
Outdated
Show resolved
Hide resolved
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, added some minor comments.
sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Outdated
Show resolved
Hide resolved
Gentle ping for review. |
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/state/MultimapState.java
Outdated
Show resolved
Hide resolved
/** Returns an {@link Iterable} over the keys contained in this multimap. */ | ||
ReadableState<Iterable<K>> keys(); | ||
|
||
/** Returns an {@link Iterable} over all key-values pairs contained in this multimap. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** Returns an {@link Iterable} over all key-values pairs contained in this multimap. */ | |
/** Returns an {@link Iterable} over all key-value pairs contained in this multimap. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is in each pair there may be multiple values, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that we are returning Entry<K, Iterable<V>>
which is why I suggested the swap to key-value
over key-values
.
Note that other map APIs use Entry<K, V>
, see:
https://guava.dev/releases/23.0/api/docs/com/google/common/collect/ArrayListMultimap.html#entries--
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/MultiValuedMap.html#entries--
It would likely make sense to match what existing common libraries do on this front so we are idiomatic for Java users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, done.
sdks/java/core/src/main/java/org/apache/beam/sdk/state/MultimapState.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/state/MultimapState.java
Outdated
Show resolved
Hide resolved
/** Returns an {@link Iterable} over the keys contained in this multimap. */ | ||
ReadableState<Iterable<K>> keys(); | ||
|
||
/** Returns an {@link Iterable} over all key-values pairs contained in this multimap. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that we are returning Entry<K, Iterable<V>>
which is why I suggested the swap to key-value
over key-values
.
Note that other map APIs use Entry<K, V>
, see:
https://guava.dev/releases/23.0/api/docs/com/google/common/collect/ArrayListMultimap.html#entries--
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/MultiValuedMap.html#entries--
It would likely make sense to match what existing common libraries do on this front so we are idiomatic for Java users.
|
||
@Override | ||
public void clear() { | ||
contents = ArrayListMultimap.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
@Override | ||
public ReadableState<Iterable<V>> get(K key) { | ||
return CollectionViewState.of(contents.get(keyCoder.structuralValue(key))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This returns a stale view in this order of operations:
mm.put("A", 1);
ReadableState<Iterable<V>> iter = mm.get("A");
mm.clear();
iter.read(); <-- will contain "1"
I believe there are other combinations where you won't get what you want. Please ensure that ParDoTest covers:
get
before and after remove/put/clear.containsKey
before and after remove/put/clearentries
before and after remove/put/clearisEmpty
before and after remove/put/clearkeys
fore and after remove/put/clear
Currently I see your covering entries before and after put/remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, updated ParDoTest to include those as well.
sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMultimapState.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Lukasz Cwik <[email protected]>
gentle ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add coverage for get/containsKey for the structural multimap test and then LGTM
|
||
@Override | ||
public void clear() { | ||
contents = ArrayListMultimap.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The +1 was for your response that we need to keep stuff consistent for past reads that were returned.
assertEquals(4, Iterables.size(entries)); | ||
assertEquals(5, Iterables.size(entriesView.read())); | ||
assertEquals(5, Iterables.size(state.entries().read())); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add coverage for containsKey/get since these accessors are important and need to do the structural key conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Can you help merge the PR if LGTY? thanks!
…ting structural values.
Run Java PreCommit |
Run Java_Spark3_Versions PreCommit |
…work(that input elements are processed in the same order as in Create.of) is made.
The unit test pipeline
.apply(
Create.of(
KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("a", 97)),
KV.of("hello", KV.of("a", 98)), KV.of("hello", KV.of("b", 33))))
.apply(ParDo.of(fn)); |
Run Java_GCP_IO_Direct PreCommit |
Run Java_Examples_Dataflow_Java11 PreCommit |
* Add Multimap API * Update based on commit * Fix format * Fix typo and adopt comment suggestion * Fix spotless check * Update based on comments * Change Multimap in memory implementation to use structural value for keys. * Address comments * Apply suggestions from code review Co-authored-by: Lukasz Cwik <[email protected]> * Update MultimapState.entries() to return Entry<K, V> instead of Entry<K, Iterable<V>> * Update MultimapState inmemory runner impl and added more thorough unit tests * In MultimapState ParDoTest also test against get/containsKey when testing structural values. * Fix flaky unit test, in which incorrect assumptions of the test framework(that input elements are processed in the same order as in Create.of) is made. Co-authored-by: Lukasz Cwik <[email protected]>
Adds the API and and in-memory implementation for multimap state. This is the first PR for #22831. Doc - Beam MultimapState.
#23492 is the PR for the windmill impl.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.