-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25299] Add AttemptId to Shuffle API #574
Conversation
This reverts commit 16caee4.
Builds off #569 I could also make a PR that works off spark-25299 branch, but it would be a lot of merge conflicts @mccheah @ifilonenko for initial review |
@@ -217,7 +217,7 @@ void closeAndWriteOutput() throws IOException { | |||
final SpillInfo[] spills = sorter.closeAndGetSpills(); | |||
sorter = null; | |||
final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport | |||
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); | |||
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions(), taskContext.attemptNumber()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has to be taskContext.taskAttemptId()
to deal w/ multiple retries for the same stage. attemptNumber()
will deal with speculative execution, but if you've got a zombie taskset still running a shuffle map task, and then another taskset is launched for the same stage, they'll both have attemptNumber() == 0
No description provided.