Skip to content

Commit

Permalink
Added replaceGcsFilesWithLocalFiles (#33006)
Browse files Browse the repository at this point in the history
* Added replaceGcsFilesWithLocalFiles

* use static

* added unit tests

* make it public

* Added logs

* spotless

* Added a simple IT to cover downloading the GCS file and staging it later.
  • Loading branch information
liferoad authored Nov 19, 2024
1 parent 3b759f2 commit c57553c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
import static org.apache.beam.sdk.util.construction.resources.PipelineResources.detectClassPathResourcesToStage;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineLauncher.Sdk;
import org.apache.beam.it.common.PipelineOperator.Result;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
Expand Down Expand Up @@ -67,6 +71,29 @@ public void testWordCountDataflow() throws IOException {
assertThatResult(result).isLaunchFinished();
}

@Test
public void testWordCountDataflowWithGCSFilesToStage() throws IOException {

PipelineOptions pipelineOptions = wcPipeline.getOptions();
List<String> filesToStage =
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader(), pipelineOptions);
filesToStage.add("gs://apache-beam-samples/shakespeare/kinglear.txt");

LaunchConfig options =
LaunchConfig.builder("test-wordcount")
.setSdk(Sdk.JAVA)
.setPipeline(wcPipeline)
.addParameter("runner", "DataflowRunner")
.addParameter("filesToStage", String.join(",", filesToStage))
.build();

LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options);
assertThatPipeline(launchInfo).isRunning();
Result result =
pipelineOperator.waitUntilDone(createConfig(launchInfo, Duration.ofMinutes(20)));
assertThatResult(result).isLaunchFinished();
}

/** Build WordCount pipeline. */
private void buildPipeline() {
wcPipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import com.google.auto.value.AutoValue;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -179,6 +181,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -257,6 +260,92 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/** Dataflow service endpoints are expected to match this pattern. */
static final String ENDPOINT_REGEXP = "https://[\\S]*googleapis\\.com[/]?";

/**
* Replaces GCS file paths with local file paths by downloading the GCS files locally. This is
* useful when files need to be accessed locally before being staged to Dataflow.
*
* @param filesToStage List of file paths that may contain GCS paths (gs://) and local paths
* @return List of local file paths where any GCS paths have been downloaded locally
* @throws RuntimeException if there are errors copying GCS files locally
*/
public static List<String> replaceGcsFilesWithLocalFiles(List<String> filesToStage) {
List<String> processedFiles = new ArrayList<>();

for (String fileToStage : filesToStage) {
String localPath;
if (fileToStage.contains("=")) {
// Handle files with staging name specified
String[] components = fileToStage.split("=", 2);
String stagingName = components[0];
String filePath = components[1];

if (filePath.startsWith("gs://")) {
try {
// Create temp file with exact same name as GCS file
String gcsFileName = filePath.substring(filePath.lastIndexOf('/') + 1);
File tempDir = Files.createTempDir();
tempDir.deleteOnExit();
File tempFile = new File(tempDir, gcsFileName);
tempFile.deleteOnExit();

LOG.info(
"Downloading GCS file {} to local temp file {}",
filePath,
tempFile.getAbsolutePath());

// Copy GCS file to local temp file
ResourceId source = FileSystems.matchNewResource(filePath, false);
try (ReadableByteChannel reader = FileSystems.open(source);
FileOutputStream writer = new FileOutputStream(tempFile)) {
ByteStreams.copy(Channels.newInputStream(reader), writer);
}

localPath = stagingName + "=" + tempFile.getAbsolutePath();
LOG.info("Replaced GCS path {} with local path {}", fileToStage, localPath);
} catch (IOException e) {
throw new RuntimeException("Failed to copy GCS file locally: " + filePath, e);
}
} else {
localPath = fileToStage;
}
} else {
// Handle files without staging name
if (fileToStage.startsWith("gs://")) {
try {
// Create temp file with exact same name as GCS file
String gcsFileName = fileToStage.substring(fileToStage.lastIndexOf('/') + 1);
File tempDir = Files.createTempDir();
tempDir.deleteOnExit();
File tempFile = new File(tempDir, gcsFileName);
tempFile.deleteOnExit();

LOG.info(
"Downloading GCS file {} to local temp file {}",
fileToStage,
tempFile.getAbsolutePath());

// Copy GCS file to local temp file
ResourceId source = FileSystems.matchNewResource(fileToStage, false);
try (ReadableByteChannel reader = FileSystems.open(source);
FileOutputStream writer = new FileOutputStream(tempFile)) {
ByteStreams.copy(Channels.newInputStream(reader), writer);
}

localPath = tempFile.getAbsolutePath();
LOG.info("Replaced GCS path {} with local path {}", fileToStage, localPath);
} catch (IOException e) {
throw new RuntimeException("Failed to copy GCS file locally: " + fileToStage, e);
}
} else {
localPath = fileToStage;
}
}
processedFiles.add(localPath);
}

return processedFiles;
}

/**
* Construct a runner from the provided options.
*
Expand Down Expand Up @@ -312,6 +401,9 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) {
}

if (dataflowOptions.getFilesToStage() != null) {
// Replace GCS file paths with local file paths
dataflowOptions.setFilesToStage(
replaceGcsFilesWithLocalFiles(dataflowOptions.getFilesToStage()));
// The user specifically requested these files, so fail now if they do not exist.
// (automatically detected classpath elements are permitted to not exist, so later
// staging will not fail on nonexistent files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.io.Writer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -1155,6 +1156,19 @@ public void testNonExistentStagingLocation() throws IOException {
assertValidJob(jobCaptor.getValue());
}

@Test
public void testReplaceGcsFilesWithLocalFilesEmptyList() {
List<String> filesToStage = Collections.emptyList();
List<String> processedFiles = DataflowRunner.replaceGcsFilesWithLocalFiles(filesToStage);
assertTrue(processedFiles.isEmpty());
}

@Test(expected = RuntimeException.class)
public void testReplaceGcsFilesWithLocalFilesIOError() {
List<String> filesToStage = Collections.singletonList("gs://non-existent-bucket/file.jar");
DataflowRunner.replaceGcsFilesWithLocalFiles(filesToStage);
}

@Test
public void testNonExistentProfileLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Expand Down

0 comments on commit c57553c

Please sign in to comment.