diff --git a/playground/infrastructure/cloudbuild/playground_ci_examples.sh b/playground/infrastructure/cloudbuild/playground_ci_examples.sh old mode 100644 new mode 100755 diff --git a/sdks/go/examples/large_wordcount/large_wordcount.go b/sdks/go/examples/large_wordcount/large_wordcount.go index df04b19a3838..eb9cf3010e75 100644 --- a/sdks/go/examples/large_wordcount/large_wordcount.go +++ b/sdks/go/examples/large_wordcount/large_wordcount.go @@ -73,6 +73,7 @@ import ( _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal" diff --git a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go index f25f07a96d7b..f5f22cae1d65 100644 --- a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go +++ b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go @@ -62,7 +62,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" - "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs" @@ -119,6 +119,6 @@ func main() { // formatted strings) to a text file. textio.Write(s, "wordcounts.txt", formatted) - // Run the pipeline on the direct runner. - direct.Execute(context.Background(), p) + // Run the pipeline on the prism runner. + prism.Execute(context.Background(), p) } diff --git a/sdks/go/examples/snippets/10metrics.go b/sdks/go/examples/snippets/10metrics.go index 34d8b113d7d8..c69a03c444d5 100644 --- a/sdks/go/examples/snippets/10metrics.go +++ b/sdks/go/examples/snippets/10metrics.go @@ -34,7 +34,7 @@ func queryMetrics(pr beam.PipelineResult, ns, n string) metrics.QueryResults { // [END metrics_query] -var runner = "direct" +var runner = "prism" // [START metrics_pipeline] diff --git a/sdks/go/pkg/beam/runner.go b/sdks/go/pkg/beam/runner.go index 43f6ccce5cd0..c9747da602e1 100644 --- a/sdks/go/pkg/beam/runner.go +++ b/sdks/go/pkg/beam/runner.go @@ -22,10 +22,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) -// TODO(herohde) 7/6/2017: do we want to make the selected runner visible to -// transformations? That would allow runner-dependent operations or -// verification, but require that it is stored in Init and used for Run. - var ( runners = make(map[string]func(ctx context.Context, p *Pipeline) (PipelineResult, error)) ) diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go index 21cbb1155ea2..13288306066f 100644 --- a/sdks/go/pkg/beam/runners/direct/direct.go +++ b/sdks/go/pkg/beam/runners/direct/direct.go @@ -15,6 +15,9 @@ // Package direct contains the direct runner for running single-bundle // pipelines in the current process. Useful for testing. +// +// Deprecated: Use prism as a local runner instead. +// Reliance on the direct runner leads to non-portable pipelines. package direct import ( diff --git a/sdks/go/pkg/beam/runners/prism/README.md b/sdks/go/pkg/beam/runners/prism/README.md index 0fc6e6e68416..7ad9dc1d4579 100644 --- a/sdks/go/pkg/beam/runners/prism/README.md +++ b/sdks/go/pkg/beam/runners/prism/README.md @@ -30,8 +30,8 @@ single machine use. For Go SDK users: - `import "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"` - - Short term: set runner to "prism" to use it, or invoke directly. - - Medium term: switch the default from "direct" to "prism". + - Short term: set runner to "prism" to use it, or invoke directly. ☑ + - Medium term: switch the default from "direct" to "prism". ☑ - Long term: alias "direct" to "prism", and delete legacy Go direct runner. Prisms allow breaking apart and separating a beam of light into @@ -118,7 +118,7 @@ can have features selectively disabled to ensure ## Current Limitations -* Experimental and testing use only. +* Testing use only. * Executing docker containers isn't yet implemented. * This precludes running the Java and Python SDKs, or their transforms for Cross Language. * Loopback execution only. @@ -127,7 +127,6 @@ can have features selectively disabled to ensure * Not yet suitable for larger jobs, which may have intermediate data that exceeds memory bounds. * Doesn't yet support sufficient intermediate data garbage collection for indefinite stream processing. * Doesn't yet execute all beam pipeline features. -* No UI for job status inspection. ## Implemented so far. @@ -140,18 +139,24 @@ can have features selectively disabled to ensure * Global Window * Interval Windowing * Session Windows. +* CoGBKs * Combines lifted and unlifted. * Expands Splittable DoFns +* Process Continuations (AKA Streaming transform support) * Limited support for Process Continuations * Residuals are rescheduled for execution immeadiately. * The transform must be finite (and eventually return a stop process continuation) * Basic Metrics support +* Stand alone execution support + * Web UI available when run as a standalone command. +* Progess tracking + * Channel Splitting + * Dynamic Splitting ## Next feature short list (unordered) See https://github.com/apache/beam/issues/24789 for current status. -* Resolve watermark advancement for Process Continuations * Test Stream * Triggers & Complex Windowing Strategy execution. * State @@ -162,11 +167,6 @@ See https://github.com/apache/beam/issues/24789 for current status. * FnAPI Optimizations * Fusion * Data with ProcessBundleRequest & Response -* Progess tracking - * Channel Splitting - * Dynamic Splitting -* Stand alone execution support -* UI reporting of in progress jobs This is not a comprehensive feature set, but a set of goals to best support users of the Go SDK in testing their pipelines. diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index d2b8f01f72dd..8e8412fa3d88 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -25,12 +25,13 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag. - // ptest uses the direct runner to execute pipelines by default. + // ptest uses the prism runner to execute pipelines by default. + // but includes the direct runner for legacy fallback reasons to + // support users overriding the default back to the direct runner. _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" ) -// TODO(herohde) 7/10/2017: add hooks to verify counters, logs, etc. - // Create creates a pipeline and a PCollection with the given values. func Create(values []any) (*beam.Pipeline, beam.Scope, beam.PCollection) { p := beam.NewPipeline() @@ -65,7 +66,7 @@ func CreateList2(a, b any) (*beam.Pipeline, beam.Scope, beam.PCollection, beam.P // to function. var ( Runner = runners.Runner - defaultRunner = "direct" + defaultRunner = "prism" mainCalled = false ) diff --git a/sdks/go/pkg/beam/testing/ptest/ptest_test.go b/sdks/go/pkg/beam/testing/ptest/ptest_test.go index cbedd6b406fc..844737352a5a 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest_test.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest_test.go @@ -21,6 +21,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" ) +func TestMain(m *testing.M) { + Main(m) +} + func TestCreate(t *testing.T) { inputs := []any{"a", "b", "c"} p, s, col := Create(inputs) diff --git a/sdks/go/pkg/beam/x/beamx/run.go b/sdks/go/pkg/beam/x/beamx/run.go index 0355e453995e..0be42561b658 100644 --- a/sdks/go/pkg/beam/x/beamx/run.go +++ b/sdks/go/pkg/beam/x/beamx/run.go @@ -32,6 +32,7 @@ import ( _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal" @@ -39,7 +40,7 @@ import ( var ( runner = runners.Runner - defaultRunner = "direct" + defaultRunner = "prism" ) func getRunner() string { @@ -51,7 +52,7 @@ func getRunner() string { } // Run invokes beam.Run with the runner supplied by the flag "runner". It -// defaults to the direct runner, but all beam-distributed runners and textio +// defaults to the prism runner, but all beam-distributed runners and textio // filesystems are implicitly registered. func Run(ctx context.Context, p *beam.Pipeline) error { _, err := beam.Run(ctx, getRunner(), p) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 1007d9e64dc6..0f9e5984eadd 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -136,6 +136,38 @@ var portableFilters = []string{ "TestSetStateClear", } +// TODO(lostluck): set up a specific run for these. +var prismFilters = []string{ + // The prism runner does not support the TestStream primitive + "TestTestStream.*", + // The trigger and pane tests uses TestStream + "TestTrigger.*", + "TestPanes", + + // TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism. + "TestKafkaIO.*", + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + "TestBigQueryIO.*", + "TestSpannerIO.*", + // The prsim runner does not support pipeline drain for SDF. + "TestDrain", + // FhirIO currently only supports Dataflow runner + "TestFhirIO.*", + // OOMs currently only lead to heap dumps on Dataflow runner + "TestOomParDo", + // The prism runner does not support user state. + "TestValueState", + "TestValueStateWindowed", + "TestValueStateClear", + "TestBagState", + "TestBagStateClear", + "TestCombiningState", + "TestMapState", + "TestMapStateClear", + "TestSetState", + "TestSetStateClear", +} + var flinkFilters = []string{ // TODO(https://github.com/apache/beam/issues/20723): Flink tests timing out on reads. "TestXLang_Combine.*", @@ -249,7 +281,7 @@ var dataflowFilters = []string{ "TestCheckpointing", // TODO(21761): This test needs to provide GCP project to expansion service. "TestBigQueryIO_BasicWriteQueryRead", - // Can't handle the test spanner container or access a local spanner. + // Can't handle the test spanner container or access a local spanner. "TestSpannerIO.*", // Dataflow does not drain jobs by itself. "TestDrain", @@ -292,6 +324,8 @@ func CheckFilters(t *testing.T) { switch runner { case "direct", "DirectRunner": filters = directFilters + case "prism", "PrismRunner": + filters = prismFilters case "portable", "PortableRunner": filters = portableFilters case "flink", "FlinkRunner":