Skip to content

Commit

Permalink
This closes apache#1272
Browse files Browse the repository at this point in the history
  • Loading branch information
Sela committed Nov 4, 2016
2 parents 46fbfe0 + 90a75d1 commit 14e093a
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
class TmpCheckpointDirFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class);
return "file:///tmp/" + sparkPipelineOptions.getJobName();
return "/tmp/" + options.as(SparkPipelineOptions.class).getJobName();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
Expand All @@ -48,7 +43,7 @@
public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
private static final Logger LOG =
LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
private static final Iterable<String> KNOWN_RELIABLE_FS = Arrays.asList("hdfs", "s3", "gs");
private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";

private final Pipeline pipeline;
private final SparkPipelineOptions options;
Expand Down Expand Up @@ -83,19 +78,11 @@ public JavaStreamingContext create() {

// set checkpoint dir.
String checkpointDir = options.getCheckpointDir();
LOG.info("Checkpoint dir set to: {}", checkpointDir);
try {
// validate checkpoint dir and warn if not of a known durable filesystem.
URL checkpointDirUrl = new URL(checkpointDir);
if (!Iterables.any(KNOWN_RELIABLE_FS, Predicates.equalTo(checkpointDirUrl.getProtocol()))) {
LOG.warn("Checkpoint dir URL {} does not match a reliable filesystem, in case of failures "
+ "this job may not recover properly or even at all.", checkpointDirUrl);
}
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to form checkpoint dir URL. CheckpointDir should be in "
+ "the form of hdfs:///path/to/dir or other reliable fs protocol, "
+ "or file:///path/to/dir for local mode.", e);
if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case "
+ "of failures this job may not recover properly or even at all.", checkpointDir);
}
LOG.info("Checkpoint dir set to: {}", checkpointDir);
jssc.checkpoint(checkpointDir);

// register listeners.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public class EmptyStreamAssertionTest implements Serializable {

@Test
public void testAssertion() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
Duration windowDuration = new Duration(options.getBatchIntervalMillis());

Pipeline pipeline = Pipeline.create(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ public class FlattenStreamingTest {

@Test
public void testFlattenUnbounded() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);

Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
Expand All @@ -81,8 +80,7 @@ public void testFlattenUnbounded() throws Exception {

@Test
public void testFlattenBoundedUnbounded() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);

Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public static void init() throws IOException {

@Test
public void testEarliest2Topics() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
// It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
// so to be on the safe side we'll set to 750 msec.
options.setMinReadTimeMillis(750L);
Expand Down Expand Up @@ -122,8 +121,7 @@ public void testEarliest2Topics() throws Exception {

@Test
public void testLatest() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
//--- setup
final String topic = "topic";
// messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ private static void produce() {

@Test
public void testRun() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
// It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
// so to be on the safe side we'll set to 750 msec.
options.setMinReadTimeMillis(750L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public class SimpleStreamingWordCountTest implements Serializable {

@Test
public void testFixedWindows() throws Exception {
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);

// override defaults
options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
package org.apache.beam.runners.spark.translation.streaming.utils;


import java.io.File;
import java.net.MalformedURLException;
import java.io.IOException;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;



/**
Expand All @@ -41,11 +42,10 @@ protected void before() throws Throwable {
options.setTimeout(1000L);
}

public SparkPipelineOptions withTmpCheckpointDir(File checkpointDir)
throws MalformedURLException {
public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
throws IOException {
// tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
// so need to add the missing protocol.
options.setCheckpointDir(checkpointDir.toURI().toURL().toString());
options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
return options;
}

Expand Down

0 comments on commit 14e093a

Please sign in to comment.