-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25299] Shuffle locations api #517
Conversation
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
Show resolved
Hide resolved
@squito @vanzin for review. This one is pretty critical - we're dealing with pretty sensitive code paths here. I took at the various places Spark deals with the block manager ID on a map status. There's some assumptions that Spark makes about the availability of shuffle files based on the host which are a bit harder to reason about given that we've added the possibility of storing in remote locations. Would appreciate another set of eyes taking a closer look, I'll do some more digging in the meantime. |
core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java
Show resolved
Hide resolved
package org.apache.spark.api.shuffle; | ||
|
||
/** | ||
* Marker interface representing a location of a shuffle block. Implementations of shuffle readers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be even more specific, it's not the location of a shuffle block, but a location from which to retrieve shuffle blocks. Trying to figure out how to word this so that people don't get this confused with a ShuffleBlockId
. Maybe you can say "a location of a shuffle block manager" instead to differentiate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not always the location of a shuffle block manager though because I can choose a server that's running just a file store for example.
core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java
Outdated
Show resolved
Hide resolved
I'm not sure if there are tests to add - the existing tests cover this stuff pretty comprehensively, apart from the serialization hiccup I mentioned earlier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a superficial review so far
There's some assumptions that Spark makes about the availability of shuffle files based on the host which are a bit harder to reason about given that we've added the possibility of storing in remote locations.
yeah, that part concerns me a lot. The scheduler has this assumption baked in pretty tightly. There's a couple of configs to control behavior with the external shuffle service, but we'd need to make that more general. The old behavior is definitely wrong for some of the distributed stores we're thinking of. The plugin needs to have some control, while also not exposing it to all the ugly details of the scheduler.
That might be orthogonal to this change, but something we need to be thinking about ...
@@ -282,7 +283,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging | |||
|
|||
// For testing | |||
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) | |||
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { | |||
: Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of these methods should probably renamed to getMapSizesByShuffleLocation
...
or is the idea that when you use a plugin, you won't touch MapOutputTracker at all, and these changes are just necessary bookkeeping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read code will still ask MapOutputTracker
for blocks by location. In the case that the plugin isn't using locations, it's just going to get back an iterator of a single element: the null location, and the Seq
containing all of the blocks. We then need to convert whatever iterator we get back into an appropriate request to the reader plugin. If we like I can write a proof of concept PR against this code on the read side, because it's a little difficult to concretely reason about this otherwise.
import org.roaringbitmap.RoaringBitmap | ||
import scala.collection.mutable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old ordering was correct
|
||
/** | ||
* A [[MapStatus]] implementation that tracks the size of each block. Size for each block is | ||
* represented using a single byte. | ||
* | ||
* @param loc location where the task is being executed. | ||
* @param mapShuffleLocs location where the task is being executed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the loc
parameter is still there. Also I don't think mapShuffleLocs
is really where the task is being executed -- its more like where the output should be fetched from, right?
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.ArrayBuffer | ||
import scala.reflect.ClassTag | ||
import scala.util.control.NonFatal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
import org.mockito.ArgumentMatchers.any | ||
import org.mockito.Mockito._ | ||
import scala.collection.mutable.ArrayBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); | ||
mapStatus = MapStatus$.MODULE$.apply( | ||
blockManager.shuffleServerId(), | ||
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by the changes to the writers -- I guess for now you're just hard-coding it, and later on there will be a place for plugins to do something different here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's correct, this is just a placeholder.
But how much control do we expect the plugins to need? How important is locality going to be in most cases? If we're fetching from a distributed system, does it matter which node is running the read task? |
I'm also curious as to which distributed stores are being referred to here. Specifically those where locality is a more sensitive matter, I presume. |
Actually I don't really care about locality so much with shuffles (I'm not convinced locality ever really mattered for shuffles, and it will be even less common and harder to make it work with a shuffle store plugin). My concern is how shuffle fetch failures are treated by the scheduler. Currently, Spark treats one fetch failure as an indication that all shuffle output on that executor (or host, if using the external shuffle service) is gone. As an extreme example of how we could really mess this up with a distributed store, imagine writing that data to hdfs, and if you used the namenode as the host id. Suppose you had a fetch failure due to some random / transient issue (hdfs is a little overloaded so there is a timeout, or its really a problem on the receiving end, but it looks like a fetch failure, etc.) If you didn't change any of the failure handling logic in the scheduler, it would mark everything with the same host as missing (which would be all of the shuffle data from the entire app, since the same namenode would be used for everything). (the TODO there is also telling for how even the existing code isn't really making great decisions here, and this might be a good time to introduce an abstraction here anyway ...) |
The locations API wouldn't be used for a DFS implementation of the plugin though right? The location can be derived instead by the block identifier (shuffle id + map id + reduce id). So the plugin doesn't actually have to store shuffle locations on the driver at all. And even if such a plugin did store shuffle locations in the driver, |
I think what we'll end up doing though is invalidating shuffle locations that are really still available in the remote storage, so we lose some work unnecessarily. Basically we would possibly remove map outputs that are still perfectly useable, since the availability of the shuffle blocks is no longer tied to the availability of the shuffle server / executor that wrote those blocks. But in any case we only end up recomputing work we didn't have to. Though I'd concede that such rewrites are more expensive when writing to remote storage. |
I'm not claiming your implementation is broken -- I'm letting you know things I'm concerned about, especially as I've only seen a partial implementation so far. That HDFS example was extreme, we probably wouldn't do that, but I do think there are corner cases for failure handling we do need to think through. Does each plugin get to choose how many failures lead to recomputation? Are there a few high-level options (remove data on first failure / never remove / remove only the one failed block, nothing else)?
Technically yes, but I think in practice that could easily end up being a complete barrier to adoption as the performance penalty will be huge. Keep in mind early implementations will probably be prone to timeouts etc. manifesting as fetch failures as we work out the kinks. |
I see the concern. But a more optimistic perspective is that the number of fetch failures could also decrease, due to the readers fetching data from a more resilient store. I'd expect the failure rate on average from reading shuffle data from distributed stores, would be lower than the failure rate of readers fetching data from shuffle servers and other executors - which is a core goal of the project itself. Nevertheless I agree that we also have a responsibility to build better failure tolerance in Spark proper given that we don't assume plugins are built resiliently. Where should we start looking to trace the logic for how Spark handles this now? |
you can probably start from tracing FetchFailedException. its mainly handled in the DAGScheduler, but also has implications on other things like the OutputCommitCoordinator, and handling of multiple conflicting writes to the shuffle store eg. SPARK-25341 & SPARK-25342. |
core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java
Show resolved
Hide resolved
// | ||
// If more similar cases arise, consider adding a serialization API for these shuffle locations. | ||
private val DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 0 | ||
private val NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't you use boolean here instead of a byte?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add more built-in shuffle locations we might want to have more specialized serialization. Hope we don't have to do that, though.
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), | ||
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) | ||
.map { case (loc, blocks) => | ||
require( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think this might be more clear as a case statement and throws if it's not one of the cases (which, at this time, would only be DefaultMapShuffleLocations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't do that because DefaultMapShuffleLocations
is not a case class and we can't pattern match against it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not true -- you can pattern match to do the equivalent of an if instanceof, asInstanceOf
, you just don't get to directly put the fields in the pattern (unless you write an unapply()
method, which we shy away from in spark)
scala> class Foo(val x: Int)
defined class Foo
scala> val x: Any = new Foo(1)
x: Any = Foo@228cea97
scala> x match {
| case f: Foo => f.x
| }
res2: Int = 1
@@ -871,9 +873,9 @@ private[spark] object MapOutputTracker extends Logging { | |||
shuffleId: Int, | |||
startPartition: Int, | |||
endPartition: Int, | |||
statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { | |||
statuses: Array[MapStatus]): Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was starting the Reader code and I was thinking this could easily return Iterator[(ShuffleLocation, Seq[(BlockId, Long)])]
instead of Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])]
. It seems strange to pass back the MapShuffleLocations
for an entire map stage when the BlockId
already specifies both the mapId and reduceId. So I think it would make things more clear here, and avoid an extra step in the reader, if we just did
splitsByAddress.getOrElseUpdate(status.mapShuffleLocations.getLocationForBlock(part), ListBuffer())
on line 888.
I looked at this function, and it's only being used by the reader, which would only need the ShuffleLocation
and not the MapShuffleLocations
, and the tests. In the tests, they're used for verifying that the BlockManagerId
is correct, which seems like the wrong thing to use because other implementations of the MapShuffleLocations
that are not the default implementation aren't necessarily going to be referencing the BlockManagerId
. Perhaps it's better to just write a separate function for retrieving the BlockManagerId
direction for the tests.
if (mapShuffleLocs != null) { | ||
out.writeBoolean(true) | ||
if (mapShuffleLocs.isInstanceOf[DefaultMapShuffleLocations] | ||
&& mapShuffleLocs.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId == loc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, let's make mapShuffleLocs.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId == loc
an assert since it should never happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any red flags, but to be honest I don't really see how the pieces fit together from this yet.
I mentioned these inline, but my two architectural concerns are
- handling fetch failures appropriately for each shuffle store
- having a way to move blocks to shuffle store async
private static final LoadingCache<BlockManagerId, DefaultMapShuffleLocations> | ||
DEFAULT_SHUFFLE_LOCATIONS_CACHE = | ||
CacheBuilder.newBuilder() | ||
.maximumSize(10000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to your change -- I'm realizing that the cache for the BlockManagerId might perform terribly with dynamic allocation & the external shuffle server on a large cluster. The BlockManagedId contains the executorId (even if you don't care about it for shuffle blocks w/ the external shuffle server), and I could see large apps using over 10K executors.
You might at least want to make the size a shared constant, so its easier to see that these caches are related, if we ever change one of them.
@@ -56,11 +67,31 @@ private[spark] object MapStatus { | |||
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS)) | |||
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get) | |||
|
|||
// A temporary concession to the fact that we only expect implementations of shuffle provided by | |||
// Spark to be storing shuffle locations in the driver, meaning we want to introduce as little | |||
// serialization overhead as possible in such default cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that true? I thought the version Ilan was working on would also store shuffle locations in the driver.
I broadly think of 3 different classes of shuffle storage
-
on the local node, as spark does now
-
in some external service, which manages its own metadata (so retrieving a block requires at least two hops -- one to get the real location, one to get the data from its actual source)
-
in an external service, which is distributed, but still relies on the driver to manage metadata. Eg. each shuffle writer randomly chooses one of N shuffle store servers to write data to, and then tells the driver it wrote data to server X. server X never tells server Y about the data it has, though their both part of the same shuffle store service, so you couldn't ask server Y for that data.
We should also think about how this would work with the async-upload case, like that example we saw using alluxio. How and when would the additional block locations get reported back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that true? I thought the version Ilan was working on would also store shuffle locations in the driver.
Correct, but that would also be bundled in the Spark repository, so this can also optimize for that in serialization.
We should also think about how this would work with the async-upload case, like that example we saw using alluxio. How and when would the additional block locations get reported back?
Here's a hypothetical way to support that. You have a notion of primary and secondary locations. The primary location can be the block manager running the task (so the local disk of the mapper, as it is now). The secondary locations can be remote storage file servers, such as shuffle service nodes or a distributed file system like Alluxio. The MapShuffleLocations
contains both the primary and secondary locations.
On the read side, check if one can read from the primary location first. If that fails, attempt to read from secondary locations. If that doesn't work either, it means that both the primary location went down and the mapper went down before it could finish writing to the secondary location. In that case you lost the map task output and we have to recompute (as we would have to assuming there was no external shuffle service at all, for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your proposal for the async upload (@vanzin I think you'll be interested in this too).
Is that true? I thought the version Ilan was working on would also store shuffle locations in the driver.
Correct, but that would also be bundled in the Spark repository, so this can also optimize for that in serialization.
but does that mean that every plugin that works that way needs to live in the spark repo?
And really, I'm not talking so much about whether the code lives inside the spark repo or outside -- I'm wondering whether the api has the right hooks to support, and doesn't require extra hacks added that live outside of the api.
anyway that might not even matter that much on this bit of code right here, again maybe I need to see how it looks together, but that comment in the code confused me about the goal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that if you're not part of the repository, your serialization will result in calling the default Java serializer - which will still functionally work but might lead to less than desired performance, because well it's Java serialization. I also think implementing KryoSerializable
and registering the class will improve performance here, but that requires some advanced knowledge on the part of the developer.
If we anticipate needing the most optimal serialization performance most of the time, we should introduce a shuffle locations serialization API and/or make the seriailzation protocol more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the read side, check if one can read from the primary location first. If that fails, attempt to read from secondary locations.
I think at some point you'll need to make the secondary location the default. If the executor is gone, you don't want to pay the price of trying to read from it just to fail and go to the right place.
When I (briefly) thought about this my idea was to send an updated MapStatus
to the driver with the new location once the upload is done, that would override the existing mapping. To the best of my understanding, any tasks trying to read from the executor directly could then fail, and new ones would be started with the updated location for the block. There's most probably stuff missing in my brainstorm (e.g. the failure I described shouldn't be counted towards the failure limit, and this would probably be a special fetch failure that should not cause recomputation, things like that).
Anyway, long way to say that the idea is ok but will need some tweaking to avoid issues when executors go away.
I mentioned an idea to support async upload in #517 (comment).
Again perhaps I'm too optimistic, but isn't this also the case in the current world as well, when if one loses an external shuffle service, we lose the shuffle blocks for all the executors that ran on that host? Or to put it another way, what happens if we hit a Or to put it another way, perhaps the way to think about the fault tolerance problem is considering if we're any worse off than how fault tolerance was handled before. And given that this solution provides the option to use resilient distributed storage that handles its own fault tolerance, I'd say what we have here is strictly better than what we had before. |
yeah, I agree with that, and I think I'm making a slightly different point. I'm saying if you intend to get those advantages, I think you will need to change some of the scheduler logic, so that plugins get a bit of control over failure handling. Yes, its fine if the shuffle store can still lose data in some situations, as spark can already handle that. But if the expectation is that it will lose data if far less often, then the failure handling should be different. Eg. maybe you expect that a fetch failure is really just from an overloaded service, so rather than assuming all data is lost, you should retry (+ backoff or throttling or scaling the service or ...) |
BTW I took a quick look at the code and it seems ok, but I'm kinda digging into some other stuff so don't really have that much time for a detailed review... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging this now. we'll address some of the scheduler failure handling in the reader PR.
Implements the shuffle locations API as part of SPARK-25299. This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly. This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored. There are a few caveats to this design: - We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation. - `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
Implements the shuffle locations API as part of SPARK-25299.
This adds an additional field to all
MapStatus
objects: aMapShuffleLocations
that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly.This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored.
There are a few caveats to this design:
We originally wanted to remove the
BlockManagerId
fromMapStatus
entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields inMapStatus
should point to the same object on the heap. Thus we addO(M)
storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation.KryoSerializer
expectsCompressedMapStatus
andHighlyCompressedMapStatus
to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, theMapShuffleLocations
is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to useExternalizableSerializer
to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the sameBlockManagerId
twice in the case that the map shuffle locations is aDefaultMapShuffleLocations
.