Skip to content

Commit

Permalink
[SPARK-25299] Driver lifecycle api (#533)
Browse files Browse the repository at this point in the history
Introduce driver shuffle lifecycle APIs
  • Loading branch information
yifeih committed Sep 17, 2019
1 parent 3fc52b5 commit 0ac7fac
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.io.IOException;
import java.util.Map;

public interface ShuffleDriverComponents {

/**
* @return additional SparkConf values necessary for the executors.
*/
Map<String, String> initializeApplication();

void cleanupApplication() throws IOException;

void removeShuffleData(int shuffleId, boolean blocking) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
package org.apache.spark.shuffle.sort.io;

import org.apache.spark.SparkConf;
<<<<<<< HEAD:core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleDataIO.java
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleDataIO;
=======
import org.apache.spark.api.shuffle.ShuffleDriverComponents;
import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleDataIO;
import org.apache.spark.shuffle.sort.lifecycle.DefaultShuffleDriverComponents;
>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533):core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleDataIO.java

/**
* Implementation of the {@link ShuffleDataIO} plugin system that replicates the local shuffle
Expand All @@ -37,4 +44,9 @@ public LocalDiskShuffleDataIO(SparkConf sparkConf) {
public ShuffleExecutorComponents executor() {
return new LocalDiskShuffleExecutorComponents(sparkConf);
}

@Override
public ShuffleDriverComponents driver() {
return new DefaultShuffleDriverComponents();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
import org.apache.spark.storage.BlockManager;

<<<<<<< HEAD:core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleExecutorComponents.java
public class LocalDiskShuffleExecutorComponents implements ShuffleExecutorComponents {
=======
import java.util.Map;

public class DefaultShuffleExecutorComponents implements ShuffleExecutorComponents {
>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533):core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleExecutorComponents.java

private final SparkConf sparkConf;
private BlockManager blockManager;
Expand All @@ -50,7 +56,7 @@ public LocalDiskShuffleExecutorComponents(
}

@Override
public void initializeExecutor(String appId, String execId) {
public void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs) {
blockManager = SparkEnv.get().blockManager();
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the SparkEnv.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort.lifecycle;

import com.google.common.collect.ImmutableMap;
import org.apache.spark.SparkEnv;
import org.apache.spark.api.shuffle.ShuffleDriverComponents;
import org.apache.spark.storage.BlockManagerMaster;

import java.io.IOException;
import java.util.Map;

public class DefaultShuffleDriverComponents implements ShuffleDriverComponents {

private BlockManagerMaster blockManagerMaster;

@Override
public Map<String, String> initializeApplication() {
blockManagerMaster = SparkEnv.get().blockManager().master();
return ImmutableMap.of();
}

@Override
public void cleanupApplication() {
// do nothing
}

@Override
public void removeShuffleData(int shuffleId, boolean blocking) throws IOException {
checkInitialized();
blockManagerMaster.removeShuffle(shuffleId, blocking);
}

private void checkInitialized() {
if (blockManagerMaster == null) {
throw new IllegalStateException("Driver components must be initialized before using");
}
}
}
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled

import scala.collection.JavaConverters._

import org.apache.spark.api.shuffle.ShuffleDriverComponents
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -58,7 +59,9 @@ private class CleanupTaskWeakReference(
* to be processed when the associated object goes out of scope of the application. Actual
* cleanup is performed in a separate daemon thread.
*/
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private[spark] class ContextCleaner(
sc: SparkContext,
shuffleDriverComponents: ShuffleDriverComponents) extends Logging {

/**
* A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
Expand Down Expand Up @@ -221,7 +224,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
blockManagerMaster.removeShuffle(shuffleId, blocking)
shuffleDriverComponents.removeShuffleData(shuffleId, blocking)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
} catch {
Expand Down Expand Up @@ -269,7 +272,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
}
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.spark.annotation.DeveloperApi
<<<<<<< HEAD
=======
import org.apache.spark.api.conda.CondaEnvironment
import org.apache.spark.api.conda.CondaEnvironment.CondaSetupInstructions
import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents}
>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533)
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.StandaloneResourceUtils._
Expand Down Expand Up @@ -216,7 +222,11 @@ class SparkContext(config: SparkConf) extends Logging {
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
<<<<<<< HEAD
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
=======
private var _shuffleDriverComponents: ShuffleDriverComponents = _
>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533)

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -524,6 +534,14 @@ class SparkContext(config: SparkConf) extends Logging {
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser

val configuredPluginClasses = conf.get(SHUFFLE_IO_PLUGIN_CLASS)
val maybeIO = Utils.loadExtensions(
classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf)
require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses")
_shuffleDriverComponents = maybeIO.head.driver()
_shuffleDriverComponents.initializeApplication().asScala.foreach {
case (k, v) => _conf.set(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX + k, v) }

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
Expand Down Expand Up @@ -598,6 +616,17 @@ class SparkContext(config: SparkConf) extends Logging {
}
_executorAllocationManager.foreach(_.start())

<<<<<<< HEAD
=======
_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this, _shuffleDriverComponents))
} else {
None
}
_cleaner.foreach(_.start())

>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533)
setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
Expand Down Expand Up @@ -1974,6 +2003,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
_heartbeater = null
}
_shuffleDriverComponents.cleanupApplication()
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.shuffle.sort

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.shuffle._
Expand Down Expand Up @@ -220,7 +222,12 @@ private[spark] object SortShuffleManager extends Logging {
classOf[ShuffleDataIO], Seq(configuredPluginClasses), conf)
require(maybeIO.size == 1, s"Failed to load plugins of type $configuredPluginClasses")
val executorComponents = maybeIO.head.executor()
executorComponents.initializeExecutor(conf.getAppId, SparkEnv.get.executorId)
val extraConfigs = conf.getAllWithPrefix(ShuffleDataIO.SHUFFLE_SPARK_CONF_PREFIX)
.toMap
executorComponents.initializeExecutor(
conf.getAppId,
SparkEnv.get.executorId,
extraConfigs.asJava)
executorComponents
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
/**
* A special [[ContextCleaner]] that saves the IDs of the accumulators registered for cleanup.
*/
private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
private class SaveAccumContextCleaner(sc: SparkContext) extends
ContextCleaner(sc, null) {
private val accumsRegistered = new ArrayBuffer[Long]

override def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle

import java.util

import com.google.common.collect.ImmutableMap

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents, ShuffleExecutorComponents, ShuffleWriteSupport}
import org.apache.spark.internal.config.SHUFFLE_IO_PLUGIN_CLASS
import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport

class ShuffleDriverComponentsSuite extends SparkFunSuite with LocalSparkContext {
test(s"test serialization of shuffle initialization conf to executors") {
val testConf = new SparkConf()
.setAppName("testing")
.setMaster("local-cluster[2,1,1024]")
.set(SHUFFLE_IO_PLUGIN_CLASS, "org.apache.spark.shuffle.TestShuffleDataIO")

sc = new SparkContext(testConf)

sc.parallelize(Seq((1, "one"), (2, "two"), (3, "three")), 3)
.groupByKey()
.collect()
}
}

class TestShuffleDriverComponents extends ShuffleDriverComponents {
override def initializeApplication(): util.Map[String, String] =
ImmutableMap.of("test-key", "test-value")

override def cleanupApplication(): Unit = {}

override def removeShuffleData(shuffleId: Int, blocking: Boolean): Unit = {}
}

class TestShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
override def driver(): ShuffleDriverComponents = new TestShuffleDriverComponents()

override def executor(): ShuffleExecutorComponents =
new TestShuffleExecutorComponents(sparkConf)
}

class TestShuffleExecutorComponents(sparkConf: SparkConf) extends ShuffleExecutorComponents {
override def initializeExecutor(appId: String, execId: String,
extraConfigs: util.Map[String, String]): Unit = {
assert(extraConfigs.get("test-key") == "test-value")
}

override def writes(): ShuffleWriteSupport = {
val blockManager = SparkEnv.get.blockManager
val blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager)
new DefaultShuffleWriteSupport(sparkConf, blockResolver)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@
* Base class of the public logical expression API.
*/
@Experimental
<<<<<<< HEAD:sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expression.java
public interface Expression {
/**
* Format the expression as a human readable SQL-like string.
*/
String describe();
=======
public interface ShuffleDataIO {
String SHUFFLE_SPARK_CONF_PREFIX = "spark.shuffle.plugin.";

ShuffleDriverComponents driver();
ShuffleExecutorComponents executor();
>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533):core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,26 @@

import org.apache.spark.annotation.Experimental;

import java.util.Map;

/**
* Represents a field or column reference in the public logical expression API.
*/
@Experimental
<<<<<<< HEAD:sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/NamedReference.java
public interface NamedReference extends Expression {
/**
* Returns the referenced field name as an array of String parts.
* <p>
* Each string in the returned array represents a field name.
*/
String[] fieldNames();
=======
public interface ShuffleExecutorComponents {
void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs);

ShuffleWriteSupport writes();

ShuffleReadSupport reads();
>>>>>>> ab9131d66a... [SPARK-25299] Driver lifecycle api (#533):core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java
}

0 comments on commit 0ac7fac

Please sign in to comment.