diff --git a/.github/workflows/typescript_tests.yml b/.github/workflows/typescript_tests.yml index 7a20016ff3f6..01efc06c14df 100644 --- a/.github/workflows/typescript_tests.yml +++ b/.github/workflows/typescript_tests.yml @@ -69,6 +69,7 @@ jobs: typescript_xlang_tests: name: 'TypeScript xlang Tests' runs-on: [self-hosted, ubuntu-20.04] + timeout-minutes: 10 strategy: fail-fast: false steps: diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index 878112162fdc..db6c7e8b0a1f 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -61,7 +61,7 @@ export function flinkRunner(runnerOptions: Object = {}): Runner { const jobServerJar = allOptions.flinkJobServerJar || (await JavaJarService.cachedJar( - JavaJarService.gradleToJar( + await JavaJarService.gradleToJar( `runners:flink:${allOptions.flinkVersion}:job-server:shadowJar` ) )); diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts index 3220852ac740..9cfb27957b0c 100644 --- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts @@ -19,6 +19,7 @@ const childProcess = require("child_process"); const crypto = require("crypto"); const fs = require("fs"); +const os = require("os"); const path = require("path"); import { ChannelCredentials } from "@grpc/grpc-js"; @@ -42,6 +43,7 @@ import * as artifacts from "../artifacts"; import { Service as JobService } from "../../utils/service"; import * as serialization from "../../serialization"; +import { version } from "../../version"; const TERMINAL_STATES = [ JobState_Enum.DONE, @@ -51,6 +53,10 @@ const TERMINAL_STATES = [ JobState_Enum.DRAINED, ]; +// TODO(robertwb): Change this to docker.io/apache/beam_typescript_sdk +// once we push images there. +const DOCKER_BASE = "gcr.io/apache-beam-testing/beam_typescript_sdk"; + type completionCallback = (terminalState: JobStateEvent) => Promise; class PortableRunnerPipelineResult extends PipelineResult { @@ -229,20 +235,27 @@ export class PortableRunner extends Runner { environments.asDockerEnvironment( env, (options as any)?.sdkContainerImage || - "gcr.io/apache-beam-testing/beam_typescript_sdk:dev" + DOCKER_BASE + ":" + version.replace("-SNAPSHOT", ".dev") ); const deps = pipeline.components!.environments[envId].dependencies; // Package up this code as a dependency. - const result = childProcess.spawnSync("npm", ["pack"], { - encoding: "latin1", - }); + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "beam-pack-")); + const result = childProcess.spawnSync( + "npm", + ["pack", "--pack-destination", tmpDir], + { + encoding: "latin1", + } + ); if (result.status === 0) { console.debug(result.stdout); } else { throw new Error(result.output); } - const packFile = path.resolve(result.stdout.trim()); + const packFile = path.resolve( + path.join(tmpDir, result.stdout.trim()) + ); deps.push(fileArtifact(packFile, "beam:artifact:type:npm:v1")); // If any dependencies are files, package them up as well. diff --git a/sdks/typescript/src/apache_beam/utils/service.ts b/sdks/typescript/src/apache_beam/utils/service.ts index 8be90616a065..b53dbd626fb8 100644 --- a/sdks/typescript/src/apache_beam/utils/service.ts +++ b/sdks/typescript/src/apache_beam/utils/service.ts @@ -216,7 +216,7 @@ export function serviceProviderFromJavaGradleTarget( } } else { jar = await JavaJarService.cachedJar( - JavaJarService.gradleToJar(gradleTarget) + await JavaJarService.gradleToJar(gradleTarget) ); } @@ -281,18 +281,18 @@ export class JavaJarService extends SubprocessService { } } - static gradleToJar( + static async gradleToJar( gradleTarget: string, appendix: string | undefined = undefined, version: string = beamVersion - ): string { + ): Promise { if (version.startsWith("0.")) { // node-ts 0.x corresponds to Beam 2.x. version = "2" + version.substring(1); } const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1]; const artifactId = "beam-" + gradlePackage.replaceAll(":", "-"); - const projectRoot = getProjectRoot(); + const projectRoot = getBeamProjectRoot(); const localPath = !projectRoot ? undefined : path.join( @@ -302,16 +302,20 @@ export class JavaJarService extends SubprocessService { "libs", JavaJarService.jarName( artifactId, - version.replace(".dev", ""), + version.replace("-SNAPSHOT", ""), "SNAPSHOT", appendix ) ); + if (version.includes("SNAPSHOT") && !projectRoot) { + version = "latest"; + } + if (localPath && fs.existsSync(localPath)) { console.info("Using pre-built snapshot at", localPath); return localPath; - } else if (version.includes(".dev")) { + } else if (version.includes("SNAPSHOT")) { throw new Error( `${localPath} not found. Please build the server with cd ${projectRoot}; ./gradlew ${gradleTarget})` @@ -326,14 +330,37 @@ export class JavaJarService extends SubprocessService { } } - static mavenJarUrl( + static async mavenJarUrl( artifactId: string, version: string, classifier: string | undefined = undefined, appendix: string | undefined = undefined, repo: string = JavaJarService.APACHE_REPOSITORY, groupId: string = JavaJarService.BEAM_GROUP_ID - ): string { + ): Promise { + if (version == "latest") { + const medatadataUrl = [ + repo, + groupId.replaceAll(".", "/"), + artifactId, + "maven-metadata.xml", + ].join("/"); + const metadata = await new Promise((resolve, reject) => { + let data = ""; + https.get(medatadataUrl, (res) => { + res.on("data", (chunk) => { + data += chunk; + }); + res.on("end", () => { + resolve(data); + }); + res.on("error", (e) => { + reject(e); + }); + }); + }); + version = metadata.match(/(.*)<\/latest>/)![1]; + } return [ repo, groupId.replaceAll(".", "/"), @@ -450,9 +477,18 @@ function serviceOverrideFor(name: string): string | undefined { } } -function getProjectRoot(): string | undefined { +function getBeamProjectRoot(): string | undefined { try { - return path.dirname(findGitRoot(__dirname)); + const projectRoot = path.dirname(findGitRoot(__dirname)); + if ( + fs.existsSync( + path.join(projectRoot, "sdks", "typescript", "src", "apache_beam") + ) + ) { + return projectRoot; + } else { + return undefined; + } } catch (Error) { return undefined; } diff --git a/sdks/typescript/src/apache_beam/worker/data.ts b/sdks/typescript/src/apache_beam/worker/data.ts index a4b2c3f73236..53a98ac4cc16 100644 --- a/sdks/typescript/src/apache_beam/worker/data.ts +++ b/sdks/typescript/src/apache_beam/worker/data.ts @@ -45,7 +45,6 @@ export class MultiplexingDataChannel { ); this.dataChannel = this.dataClient.data(metadata); this.dataChannel.on("data", async (elements) => { - console.debug("data", elements); for (const data of elements.data) { const consumer = this.getConsumer(data.instructionId, data.transformId); try { diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts index a4e7724cb2da..6338fc0dab58 100644 --- a/sdks/typescript/src/apache_beam/worker/operators.ts +++ b/sdks/typescript/src/apache_beam/worker/operators.ts @@ -270,7 +270,6 @@ export class DataSourceOperator implements IOperator { this.lastToProcessElement < Infinity ? this.lastToProcessElement : Number(desiredSplit.estimatedInputElements) - 1; - console.log(this.lastToProcessElement, this.lastProcessedElement, end); if (this.lastProcessedElement >= end) { return undefined; } diff --git a/sdks/typescript/src/apache_beam/worker/worker_main.ts b/sdks/typescript/src/apache_beam/worker/worker_main.ts index c457e36fd337..e70e056abb63 100644 --- a/sdks/typescript/src/apache_beam/worker/worker_main.ts +++ b/sdks/typescript/src/apache_beam/worker/worker_main.ts @@ -41,14 +41,26 @@ async function main() { options["beam:option:registered_node_modules:v1"] || options["registered_node_modules"] || [] - ).forEach(require); + ).forEach((m) => { + try { + require(m); + } catch (error) { + console.error( + `**ERROR** + Unable to require module '${m}' used in requireForSerialization: + please ensure that it is available in the package exports.` + ); + // Explicitly exit the process to avoid the error getting swallowed + // by a long traceback. + process.exit(1); + } + }); console.info("Starting worker", argv.id); const worker = new Worker( argv.id, { controlUrl: argv.control_endpoint, - //loggingUrl: argv.logging_endpoint, }, options );