Skip to content

Commit

Permalink
Updates the Transform Service to accept Python extra packages through…
Browse files Browse the repository at this point in the history
… the Java API (#28783)

* Updates the Transform Service to accept Python extra packages through the Java API

* Addressing reviewer comments

* Addressing reviewer comments
  • Loading branch information
chamikaramj authored Oct 11, 2023
1 parent 65eaf45 commit 99c87a2
Show file tree
Hide file tree
Showing 12 changed files with 566 additions and 48 deletions.
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:testing:test-utils:build")
dependsOn(":sdks:java:testing:tpcds:build")
dependsOn(":sdks:java:testing:watermarks:build")
dependsOn(":sdks:java:transform-service:build")
dependsOn(":sdks:java:transform-service:launcher:build")

dependsOn(":examples:java:preCommit")
dependsOn(":examples:java:twitter:preCommit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService(
} else if (options.getTransformServiceBeamVersion() != null) {
String projectName = UUID.randomUUID().toString();
int port = findAvailablePort();
service = TransformServiceLauncher.forProject(projectName, port);
service = TransformServiceLauncher.forProject(projectName, port, null);
service.setBeamVersion(options.getTransformServiceBeamVersion());

// Starting the transform service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,20 @@ public OutputT expand(InputT input) {
boolean pythonAvailable = isPythonAvailable();
boolean dockerAvailable = isDockerAvailable();

File requirementsFile = null;
if (!extraPackages.isEmpty()) {
requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
}

// We use the transform service if either of the following is true.
// * It was explicitly requested.
// * Python executable is not available in the system but Docker is available.
Expand All @@ -514,19 +528,16 @@ public OutputT expand(InputT input) {
projectName,
port);

TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
String pythonRequirementsFile =
requirementsFile != null ? requirementsFile.getAbsolutePath() : null;
TransformServiceLauncher service =
TransformServiceLauncher.forProject(projectName, port, pythonRequirementsFile);
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
// TODO(https://github.com/apache/beam/issues/26833): add support for installing extra
// packages.
if (!extraPackages.isEmpty()) {
throw new RuntimeException(
"Transform Service does not support installing extra packages yet");
}
try {
// Starting the transform service.
service.start();
// Waiting the service to be ready.
service.waitTillUp(15000);
service.waitTillUp(-1);
// Expanding the transform.
output = apply(input, String.format("localhost:%s", port), payload);
} finally {
Expand All @@ -539,17 +550,7 @@ public OutputT expand(InputT input) {
ImmutableList.Builder<String> args = ImmutableList.builder();
args.add(
"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
if (requirementsFile != null) {
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
Expand Down
8 changes: 8 additions & 0 deletions sdks/java/transform-service/docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@

BEAM_VERSION=$BEAM_VERSION
CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME
DEPENDENCIES_VOLUME=$DEPENDENCIES_VOLUME

# A requirements file with either of the following
# * PyPi packages
# * Locally available packages relative to the directory provided to
# DEPENDENCIES_VOLUME.
PYTHON_REQUIREMENTS_FILE_NAME=$PYTHON_REQUIREMENTS_FILE_NAME

GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json
COMPOSE_PROJECT_NAME=apache.beam.transform.service
TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ services:
expansion-service-2:
image: "apache/beam_python_expansion_service:${BEAM_VERSION}"
restart: on-failure
command: -id expansion-service-2 -port 5001
command: -id expansion-service-2 -port 5001 -requirements_file ${PYTHON_REQUIREMENTS_FILE_NAME} -dependencies_dir '/dependencies_volume'
volumes:
- ${CREDENTIALS_VOLUME}:/credentials_volume
- ${DEPENDENCIES_VOLUME}:/dependencies_volume
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/credentials_volume/${GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME}
3 changes: 3 additions & 0 deletions sdks/java/transform-service/launcher/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ dependencies {
shadow library.java.args4j
shadow library.java.error_prone_annotations
permitUnusedDeclared(library.java.error_prone_annotations)
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation project(path: ":sdks:java:core")
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.transformservice.launcher;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -28,6 +30,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files;
Expand Down Expand Up @@ -62,9 +65,9 @@ public class TransformServiceLauncher {
private static final int STATUS_LOGGER_WAIT_TIME = 3000;

@SuppressWarnings("argument")
private TransformServiceLauncher(@Nullable String projectName, int port) throws IOException {
LOG.info("Initializing the Beam Transform Service {}.", projectName);

private TransformServiceLauncher(
@Nullable String projectName, int port, @Nullable String pythonRequirementsFile)
throws IOException {
String tmpDirLocation = System.getProperty("java.io.tmpdir");
// We use Docker Compose project name as the name of the temporary directory to isolate
// different transform service instances that may be running in the same machine.
Expand All @@ -83,14 +86,14 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
ByteStreams.copy(getClass().getResourceAsStream("/.env"), fout);
}

// Setting up the credentials directory.
File credentialsDir = Paths.get(tmpDir, "credentials_dir").toFile();
LOG.info(
"Creating a temporary directory for storing credentials: "
+ credentialsDir.getAbsolutePath());

if (credentialsDir.exists()) {
LOG.info("Reusing the existing credentials directory " + credentialsDir.getAbsolutePath());
} else {
LOG.info(
"Creating a temporary directory for storing credentials: "
+ credentialsDir.getAbsolutePath());
if (!credentialsDir.mkdir()) {
throw new IOException(
"Could not create a temporary directory for storing credentials: "
Expand Down Expand Up @@ -124,10 +127,84 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
}
}

// Setting up the dependencies directory.
File dependenciesDir = Paths.get(tmpDir, "dependencies_dir").toFile();
Path updatedRequirementsFilePath = Paths.get(dependenciesDir.toString(), "requirements.txt");
if (dependenciesDir.exists()) {
LOG.info("Reusing the existing dependencies directory " + dependenciesDir.getAbsolutePath());
} else {
LOG.info(
"Creating a temporary directory for storing dependencies: "
+ dependenciesDir.getAbsolutePath());
if (!dependenciesDir.mkdir()) {
throw new IOException(
"Could not create a temporary directory for storing dependencies: "
+ dependenciesDir.getAbsolutePath());
}

// We create a requirements file with extra dependencies.
// If there are no extra dependencies, we just provide an empty requirements file.
File file = updatedRequirementsFilePath.toFile();
if (!file.createNewFile()) {
throw new IOException(
"Could not create the new requirements file " + updatedRequirementsFilePath);
}

// Updating dependencies.
if (pythonRequirementsFile != null) {
Path requirementsFilePath = Paths.get(pythonRequirementsFile);
List<String> updatedLines = new ArrayList<>();

try (Stream<String> lines = java.nio.file.Files.lines(requirementsFilePath)) {
lines.forEachOrdered(
line -> {
Path dependencyFilePath = Paths.get(line);
if (java.nio.file.Files.exists(dependencyFilePath)) {
Path fileName = dependencyFilePath.getFileName();
if (fileName == null) {
throw new IllegalArgumentException(
"Could not determine the filename of the local artifact "
+ dependencyFilePath);
}
try {
java.nio.file.Files.copy(
dependencyFilePath,
Paths.get(dependenciesDir.toString(), fileName.toString()));
} catch (IOException e) {
throw new RuntimeException(e);
}
updatedLines.add(fileName.toString());
} else {
updatedLines.add(line);
}
});
}

try (BufferedWriter writer =
java.nio.file.Files.newBufferedWriter(file.toPath(), StandardCharsets.UTF_8)) {
for (String line : updatedLines) {
writer.write(line);
writer.newLine();
}
writer.flush();
}
}
}

// Setting environment variables used by the docker-compose.yml file.
environmentVariables.put("CREDENTIALS_VOLUME", credentialsDir.getAbsolutePath());
environmentVariables.put("DEPENDENCIES_VOLUME", dependenciesDir.getAbsolutePath());
environmentVariables.put("TRANSFORM_SERVICE_PORT", String.valueOf(port));

Path updatedRequirementsFileName = updatedRequirementsFilePath.getFileName();
if (updatedRequirementsFileName == null) {
throw new IllegalArgumentException(
"Could not determine the file name of the updated requirements file "
+ updatedRequirementsFilePath);
}
environmentVariables.put(
"PYTHON_REQUIREMENTS_FILE_NAME", updatedRequirementsFileName.toString());

// Building the Docker Compose command.
dockerComposeStartCommandPrefix.add("docker-compose");
dockerComposeStartCommandPrefix.add("-p");
Expand All @@ -136,21 +213,37 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
dockerComposeStartCommandPrefix.add(dockerComposeFile.getAbsolutePath());
}

/**
* Specifies the Beam version to get containers for the transform service.
*
* <p>Could be a release Beam version with containers in Docker Hub or an unreleased Beam version
* for which containers are available locally.
*
* @param beamVersion a Beam version to get containers from.
*/
public void setBeamVersion(String beamVersion) {
environmentVariables.put("BEAM_VERSION", beamVersion);
}

public void setPythonExtraPackages(String pythonExtraPackages) {
environmentVariables.put("$PYTHON_EXTRA_PACKAGES", pythonExtraPackages);
}

/**
* Initializes a client for managing transform service instances.
*
* @param projectName project name for the transform service.
* @param port port exposed by the transform service.
* @param pythonRequirementsFile a requirements file with extra dependencies for the Python
* expansion services.
* @return an initialized client for managing the transform service.
* @throws IOException
*/
public static synchronized TransformServiceLauncher forProject(
@Nullable String projectName, int port) throws IOException {
@Nullable String projectName, int port, @Nullable String pythonRequirementsFile)
throws IOException {
if (projectName == null || projectName.isEmpty()) {
projectName = DEFAULT_PROJECT_NAME;
}
if (!launchers.containsKey(projectName)) {
launchers.put(projectName, new TransformServiceLauncher(projectName, port));
launchers.put(
projectName, new TransformServiceLauncher(projectName, port, pythonRequirementsFile));
}
return launchers.get(projectName);
}
Expand Down Expand Up @@ -200,10 +293,10 @@ public synchronized void status() throws IOException {

public synchronized void waitTillUp(int timeout) throws IOException, TimeoutException {
timeout = timeout <= 0 ? DEFAULT_START_WAIT_TIME : timeout;
String statusFileName = getStatus();

long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {
String statusFileName = getStatus();
try {
// We are just waiting for a local process. No need for exponential backoff.
this.wait(1000);
Expand All @@ -226,6 +319,7 @@ public synchronized void waitTillUp(int timeout) throws IOException, TimeoutExce

private synchronized String getStatus() throws IOException {
File outputOverride = File.createTempFile("output_override", null);
outputOverride.deleteOnExit();
runDockerComposeCommand(ImmutableList.of("ps"), outputOverride);

return outputOverride.getAbsolutePath();
Expand All @@ -238,6 +332,8 @@ private static class ArgConfig {
static final String PORT_ARG_NAME = "port";
static final String BEAM_VERSION_ARG_NAME = "beam_version";

static final String PYTHON_REQUIREMENTS_FILE_ARG_NAME = "python_requirements_file";

@Option(name = "--" + PROJECT_NAME_ARG_NAME, usage = "Docker compose project name")
private String projectName = "";

Expand All @@ -249,6 +345,11 @@ private static class ArgConfig {

@Option(name = "--" + BEAM_VERSION_ARG_NAME, usage = "Beam version to use.")
private String beamVersion = "";

@Option(
name = "--" + PYTHON_REQUIREMENTS_FILE_ARG_NAME,
usage = "Extra Python packages in the form of an requirements file.")
private String pythonRequirementsFile = "";
}

public static void main(String[] args) throws IOException, TimeoutException {
Expand Down Expand Up @@ -288,8 +389,12 @@ public static void main(String[] args) throws IOException, TimeoutException {
: ("port " + Integer.toString(config.port) + ".")));
System.out.println("===================================================");

String pythonRequirementsFile =
!config.pythonRequirementsFile.isEmpty() ? config.pythonRequirementsFile : null;

TransformServiceLauncher service =
TransformServiceLauncher.forProject(config.projectName, config.port);
TransformServiceLauncher.forProject(
config.projectName, config.port, pythonRequirementsFile);
if (!config.beamVersion.isEmpty()) {
service.setBeamVersion(config.beamVersion);
}
Expand Down
Loading

0 comments on commit 99c87a2

Please sign in to comment.