-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message #1259
Conversation
@bharathkk and @lakshmi-manasa-g can you take a look? |
Now I looked at the coodinator stream impl for metadatstore, it seems we shouldn't couple flush with put/putall/delete/deleteall. It should be done as a separate call at the end of updates. The current impl is hideous and can cause further perf problems down the road. |
samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
Outdated
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
Outdated
Show resolved
Hide resolved
@xinyuiscool Do you mean we should remove |
Yes, I think that's a typical store api. Not sure why it was doing all the flushes before. @bharathkk might have some context about it, but he is on vacation. |
@lakshmi-manasa-g Thanks for the review. Had one discussion with @xinyuiscool and @prateekm , we decided to change with another solution which will move the |
Signed-off-by: Alan Zhang <[email protected]>
Signed-off-by: Alan Zhang <[email protected]>
Signed-off-by: Alan Zhang <[email protected]>
Alan, this PR seems to be solving a similar problem. #1125 Can you check how the problem / solution in that PR relates to this one? |
@prateekm If we solve the issue with this PR, later we can close it. |
1. Batch write task partition assignments information to metadata store. 2. Batch write task container information to metadata store. Signed-off-by: Alan Zhang <[email protected]>
Signed-off-by: Alan Zhang <[email protected]>
Signed-off-by: Alan Zhang <[email protected]>
The changes are done and updated the details in the PR's description, please help review, thanks! |
...re/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
Show resolved
Hide resolved
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
Outdated
Show resolved
Hide resolved
...re/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Alan Zhang <[email protected]>
@alnzng here is the PR for context - #1112 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for addressing the comments.
@xinyuiscool @prateekm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the quick fix!
Symptom
When Samza's job creates lots of tasks/partitions, it can take over a long time for AM to save the metadata in a run which may cause timeout exception.
We observed if the Samza's job has over 37k tasks/partitions, it takes over 10 min for AM to save it in a run.
Cause
JobModelManager uses CoordinatorStreamStore.put() to save job metadata information which does flush for each message, and the flush operation is heavy especially when the remote server suffering the performance issues.
Changes
flush
fromput/putAll/delete
functions inCoordinatorStreamStore
.flush
after callput/putAll/delete
in related classesTests
API Changes
writeTaskPartitionAssignment
with new batch write methodwriteTaskPartitionAssignments
inTaskPartitionAssignmentManager
writeTaskContainerMapping
with new batch write methodwriteTaskContainerMappings
inTaskAssignmentManager
.Upgrade Instructions
None
Usage Instructions
None