Skip to content

Commit

Permalink
Merge pull request apache#24023 Add a Dataflow smoke test.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Nov 21, 2022
2 parents e6e832b + 177f9b9 commit 79b8c15
Show file tree
Hide file tree
Showing 15 changed files with 329 additions and 27 deletions.
68 changes: 68 additions & 0 deletions .github/workflows/typescript_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,71 @@ jobs:
working-directory: ./sdks/typescript
env:
BEAM_SERVICE_OVERRIDES: '{"python:*": "python"}'

check_gcp_variables:
timeout-minutes: 5
name: "Check GCP variables"
runs-on: ubuntu-latest
outputs:
gcp-variables-set: ${{ steps.check_gcp_variables.outputs.gcp-variables-set }}
steps:
- uses: actions/checkout@v3
- name: "Check are GCP variables set"
run: "./scripts/ci/ci_check_are_gcp_variables_set.sh"
id: check_gcp_variables
env:
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_REGION: ${{ secrets.GCP_REGION }}
GCP_SA_EMAIL: ${{ secrets.GCP_SA_EMAIL }}
GCP_SA_KEY: ${{ secrets.GCP_SA_KEY }}
GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }}
GCP_PYTHON_WHEELS_BUCKET: "not-needed-here"

typescript_dataflow_tests:
name: 'TypeScript Dataflow Tests'
runs-on: ubuntu-latest
needs:
- check_gcp_variables
if: needs.check_gcp_variables.outputs.gcp-variables-set == 'true'
strategy:
fail-fast: false
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
persist-credentials: false
submodules: recursive
- name: Install node
uses: actions/setup-node@v3
with:
node-version: '16'
- name: Install python
uses: actions/setup-python@v4
with:
python-version: 3.8
- name: Setup Beam Python
working-directory: ./sdks/python
run: |
pip install pip setuptools --upgrade
pip install -r build-requirements.txt
pip install 'pandas>=1.0,<1.5'
python setup.py develop
pip install -e ".[gcp]"
- name: Authenticate on GCP
uses: google-github-actions/setup-gcloud@v0
with:
service_account_email: ${{ secrets.GCP_SA_EMAIL }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
project_id: ${{ secrets.GCP_PROJECT_ID }}
export_default_credentials: true
- run: npm ci
working-directory: ./sdks/typescript
- run: npm run build
working-directory: ./sdks/typescript
- run: npm test -- --grep "@dataflow"
working-directory: ./sdks/typescript
env:
BEAM_SERVICE_OVERRIDES: '{"python:*": "python"}'
GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
GCP_REGION: ${{ secrets.GCP_REGION }}
GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }}
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,10 @@ def job_id(self):
def metrics(self):
return self.metric_results

def monitoring_infos(self):
logging.warning('Monitoring infos not yet supported for Dataflow runner.')
return []

@property
def has_job(self):
return self._job is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,3 +1585,7 @@ def monitoring_metrics(self):
self._monitoring_metrics = FnApiMetrics(
self._monitoring_infos_by_stage, user_metrics_only=False)
return self._monitoring_metrics

def monitoring_infos(self):
for ms in self._monitoring_infos_by_stage.values():
yield from ms
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ def GetJobMetrics(self, request, context=None):
raise LookupError("Job {} does not exist".format(request.job_id))

result = self._jobs[request.job_id].result
monitoring_info_list = []
if result is not None:
for mi in result._monitoring_infos_by_stage.values():
monitoring_info_list.extend(mi)
if result is None:
monitoring_info_list = []
else:
monitoring_info_list = result.monitoring_infos()

# Filter out system metrics
user_monitoring_info_list = [
Expand Down
12 changes: 6 additions & 6 deletions sdks/typescript/src/apache_beam/internal/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class PipelineContext {

private coders: { [key: string]: Coder<any> } = {};

constructor(public components: Components) {}
constructor(public components: Components, private componentPrefix: string) {}

getCoder<T>(coderId: string): Coder<T> {
const this_ = this;
Expand Down Expand Up @@ -102,7 +102,7 @@ export class PipelineContext {
}

createUniqueName(prefix: string): string {
return prefix + "_" + this.counter++;
return this.componentPrefix + prefix + "_" + this.counter++;
}
}

Expand All @@ -119,15 +119,15 @@ export class Pipeline {
private proto: runnerApi.Pipeline;
private globalWindowing: string;

constructor() {
this.defaultEnvironment = "jsEnvironment";
this.globalWindowing = "globalWindowing";
constructor(componentPrefix: string = "") {
this.defaultEnvironment = componentPrefix + "jsEnvironment";
this.globalWindowing = componentPrefix + "globalWindowing";
this.proto = runnerApi.Pipeline.create({
components: runnerApi.Components.create({}),
});
this.proto.components!.environments[this.defaultEnvironment] =
environments.defaultJsEnvironment();
this.context = new PipelineContext(this.proto.components!);
this.context = new PipelineContext(this.proto.components!, componentPrefix);
this.proto.components!.windowingStrategies[this.globalWindowing] =
createWindowingStrategyProto(this, globalWindows());
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/runners/dataflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

import { Pipeline } from "../internal/pipeline";
import { Pipeline } from "../proto/beam_runner_api";
import { PipelineResult, Runner } from "./runner";
import { PortableRunner } from "./portable_runner/runner";
import { PythonService } from "../utils/service";
Expand Down
7 changes: 3 additions & 4 deletions sdks/typescript/src/apache_beam/runners/direct_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ export class DirectRunner extends Runner {
return [...this.unsupportedFeaturesIter(pipeline, options)];
}

*unsupportedFeaturesIter(pipeline, options: Object = {}) {
const proto: runnerApi.Pipeline = pipeline.proto;
*unsupportedFeaturesIter(proto: runnerApi.Pipeline, options: Object = {}) {
for (const requirement of proto.requirements) {
if (!SUPPORTED_REQUIREMENTS.includes(requirement)) {
yield requirement;
Expand Down Expand Up @@ -103,13 +102,13 @@ export class DirectRunner extends Runner {
}
}

async runPipeline(p): Promise<PipelineResult> {
async runPipeline(p: runnerApi.Pipeline): Promise<PipelineResult> {
const stateProvider = new InMemoryStateProvider();
const stateCacheRef = uuid.v4();
DirectRunner.inMemoryStatesRefs.set(stateCacheRef, stateProvider);

try {
const proto = rewriteSideInputs(p.proto, stateCacheRef);
const proto = rewriteSideInputs(p, stateCacheRef);
const descriptor: ProcessBundleDescriptor = {
id: "",
transforms: proto.components!.transforms,
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/runners/flink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const fs = require("fs");
const os = require("os");
const path = require("path");

import { Pipeline } from "../internal/pipeline";
import { Pipeline } from "../proto/beam_runner_api";
import { PipelineResult, Runner } from "./runner";
import { PortableRunner } from "./portable_runner/runner";
import { JavaJarService } from "../utils/service";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ export class PortableRunner extends Runner {
}

async runPipeline(
pipeline: Pipeline,
pipeline: runnerApiProto.Pipeline,
options?: PipelineOptions
): Promise<PipelineResult> {
return this.runPipelineWithProto(pipeline.getProto(), options);
return this.runPipelineWithProto(pipeline, options);
}

async runPipelineWithProto(
Expand Down
11 changes: 5 additions & 6 deletions sdks/typescript/src/apache_beam/runners/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

import { JobState_Enum } from "../proto/beam_job_api";
import * as runnerApi from "../proto/beam_runner_api";
import { MonitoringInfo } from "../proto/metrics";
import { Pipeline } from "../internal/pipeline";
import { Root, PValue } from "../pvalue";
Expand Down Expand Up @@ -88,9 +89,7 @@ export abstract class Runner {
pipeline: (root: Root) => PValue<any> | Promise<PValue<any>>,
options?: PipelineOptions
): Promise<PipelineResult> {
const p = new Pipeline();
await pipeline(new Root(p));
const pipelineResult = await this.runPipeline(p, options);
const pipelineResult = await this.runAsync(pipeline, options);
const finalState = await pipelineResult.waitUntilFinish();
if (finalState != JobState_Enum.DONE) {
// TODO: Grab the last/most severe error message?
Expand All @@ -110,19 +109,19 @@ export abstract class Runner {
): Promise<PipelineResult> {
const p = new Pipeline();
await pipeline(new Root(p));
return this.runPipeline(p);
return this.runPipeline(p.getProto());
}

abstract runPipeline(
pipeline: Pipeline,
pipeline: runnerApi.Pipeline,
options?: PipelineOptions
): Promise<PipelineResult>;
}

export function defaultRunner(defaultOptions: Object): Runner {
return new (class extends Runner {
async runPipeline(
pipeline: Pipeline,
pipeline: runnerApi.Pipeline,
options: Object = {}
): Promise<PipelineResult> {
const directRunner =
Expand Down
2 changes: 1 addition & 1 deletion sdks/typescript/src/apache_beam/runners/universal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

import { Pipeline } from "../internal/pipeline";
import { Pipeline } from "../proto/beam_runner_api";
import { PipelineResult, Runner } from "./runner";
import { PortableRunner } from "./portable_runner/runner";
import { PythonService } from "../utils/service";
Expand Down
Loading

0 comments on commit 79b8c15

Please sign in to comment.