diff --git a/sdks/go.mod b/sdks/go.mod index 6d5a8963b0e6..ec8a5852788a 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -24,9 +24,11 @@ go 1.18 require ( cloud.google.com/go/bigquery v1.45.0 + cloud.google.com/go/bigtable v1.18.1 cloud.google.com/go/datastore v1.10.0 cloud.google.com/go/profiler v0.3.1 cloud.google.com/go/pubsub v1.28.0 + cloud.google.com/go/spanner v1.43.0 cloud.google.com/go/storage v1.29.0 github.com/aws/aws-sdk-go-v2 v1.17.3 github.com/aws/aws-sdk-go-v2/config v1.18.11 @@ -46,9 +48,11 @@ require ( github.com/proullon/ramsql v0.0.0-20211120092837-c8d0a408b939 github.com/spf13/cobra v1.6.1 github.com/testcontainers/testcontainers-go v0.15.0 + github.com/tetratelabs/wazero v1.0.0-pre.7 github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c go.mongodb.org/mongo-driver v1.11.1 + golang.org/x/exp v0.0.0-20230206171751-46f607a40771 golang.org/x/net v0.5.0 golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 golang.org/x/sync v0.1.0 @@ -60,13 +64,7 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/retry.v1 v1.0.3 gopkg.in/yaml.v2 v2.4.0 -) - -require cloud.google.com/go/spanner v1.43.0 - -require ( - cloud.google.com/go/bigtable v1.18.1 - github.com/tetratelabs/wazero v1.0.0-pre.7 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -136,9 +134,8 @@ require ( github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) +) \ No newline at end of file diff --git a/sdks/go.sum b/sdks/go.sum index c37b7eadb341..cb2dfed1ee93 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -911,6 +911,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1137,8 +1139,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200916195026-c9a70fc28ce3/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1324,4 +1326,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= -sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= +sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= \ No newline at end of file diff --git a/sdks/go/pkg/beam/runners/prism/README.md b/sdks/go/pkg/beam/runners/prism/README.md new file mode 100644 index 000000000000..1e91a3d64f8f --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/README.md @@ -0,0 +1,190 @@ + + +# Apache Beam Go Prism Runner + +Prism is a local portable Apache Beam runner authored in Go. + +* Local, for fast startup and ease of testing on a single machine. +* Portable, in that it uses the Beam FnAPI to communicate with Beam SDKs of any language. +* Go simple concurrency enables clear structures for testing batch through streaming jobs. + +It's intended to replace the current Go Direct runner, but also be for general +single machine use. + +For Go SDK users: + - Short term: set runner to "prism" to use it, or invoke directly. + - Medium term: switch the default from "direct" to "prism". + - Long term: alias "direct" to "prism", and delete legacy Go direct runner. + +Prisms allow breaking apart and separating a beam of light into +it's component wavelengths, as well as recombining them together. + +The Prism Runner leans on this metaphor with the goal of making it +easier for users and Beam SDK developers alike to test and validate +aspects of Beam that are presently under represented. + +## Configurability + +Prism is configurable using YAML, which is eagerly validated on startup. +The configuration contains a set of variants to specify execution behavior, +either to support specific testing goals, or to emulate different runners. + +Beam's implementation contains a number of details that are hidden from +users, and to date, no runner implements the same set of features. This +can make SDK or pipeline development difficult, since exactly what is +being tested will vary on the runner being used. + +At the top level the configuration contains "variants", and the variants +configure the behaviors of different "handlers" in Prism. + +Jobs will be able to provide a pipeline option to select which variant to +use. Multiple jobs on the same prism instance can use different variants. +Jobs which don't provide a variant will default to testing behavior. + +All variants should execute the Beam Model faithfully and correctly, +and with few exceptions it should not be possible for there to be an +invalid execution. The machine's the limit. + +It's not expected that all handler options are useful for pipeline authors, +These options should remain useful for SDK developers, +or more precise issue reproduction. + +For more detail on the motivation, see Robert Burke's (@lostluck) Beam Summit 2022 talk: +https://2022.beamsummit.org/sessions/portable-go-beam-runner/. + +Here's a non-exhaustive set of variants. + +### Variant Highlight: "default" + +The "default" variant is testing focused, intending to route out issues at development +time, rather than discovering them on production runners. Notably, this mode should +never use fusion, executing each Transform individually and independantly, one at a time. + +This variant should be able to execute arbitrary pipelines, correctly, with clarity and +precision when an error occurs. Other features supported by the SDK should be enabled by default to +ensure good coverage, such as caches, or RPC reductions like sending elements in +ProcessBundleRequest and Response, as they should not affect correctness. Composite +transforms like Splitable DoFns and Combines should be expanded to ensure coverage. + +Additional validations may be added as time goes on. + +Does not retry or provide other resilience features, which may mask errors. + +To ensure coverage, there may be sibling variants that use mutually exclusive alternative +executions. + +### Variant Highlight: "fast" + +Not Yet Implemented - Illustrative goal. + +The "fast" variant is performance focused, intended for local scale execution. +A psuedo production execution. Fusion optimizations should be performed. +Large PCollection should be offloaded to persistent disk. Bundles should be +dynamically split. Multiple Bundles should be executed simultaneously. And so on. + +Pipelines should execute as swiftly as possible within the bounds of correct +execution. + +### Variant Hightlight: "flink" "dataflow" "spark" AKA Emulations + +Not Yet Implemented - Illustrative goal. + +Emulation variants have the goal of replicating on the local scale, +the behaviors of other runners. Flink execution never "lifts" Combines, and +doesn't dynamically split. Dataflow has different characteristics for batch +and streaming execution with certain execution charateristics enabled or +disabled. + +As Prism is intended to implement all facets of Beam Model execution, the handlers +can have features selectively disabled to ensure + +## Current Limitations + +* Experimental and testing use only. +* Executing docker containers isn't yet implemented. + * This precludes running the Java and Python SDKs, or their transforms for Cross Language. + * Loopback execution only. + * No stand alone execution. +* In Memory Only + * Not yet suitable for larger jobs, which may have intermediate data that exceeds memory bounds. + * Doesn't yet support sufficient intermediate data garbage collection for indefinite stream processing. +* Doesn't yet execute all beam pipeline features. +* No UI for job status inspection. + +## Implemented so far. + +* DoFns + * Side Inputs + * Multiple Outputs +* Flattens +* GBKs + * Includes handling session windows. + * Global Window + * Interval Windowing + * Session Windows. +* Combines lifted and unlifted. +* Expands Splittable DoFns +* Limited support for Process Continuations + * Residuals are rescheduled for execution immeadiately. + * The transform must be finite (and eventually return a stop process continuation) +* Basic Metrics support + +## Next feature short list (unordered) + +See https://github.com/apache/beam/issues/24789 for current status. + +* Resolve watermark advancement for Process Continuations +* Test Stream +* Triggers & Complex Windowing Strategy execution. +* State +* Timers +* "PubSub" Transform +* Support SDK Containers via Testcontainers + * Cross Language Transforms +* FnAPI Optimizations + * Fusion + * Data with ProcessBundleRequest & Response +* Progess tracking + * Channel Splitting + * Dynamic Splitting +* Stand alone execution support +* UI reporting of in progress jobs + +This is not a comprehensive feature set, but a set of goals to best +support users of the Go SDK in testing their pipelines. + +## How to contribute + +Until additional structure is necessary, check the main issue +https://github.com/apache/beam/issues/24789 for the current +status, file an issue for the feature or bug to fix with `[prism]` +in the title, and refer to the main issue, before begining work +to avoid duplication of effort. + +If a feature will take a long time, please send a PR to +link to your issue from this README to help others discover it. + +Otherwise, ordinary [Beam contribution guidelines apply](https://beam.apache.org/contribute/). + +# Long Term Goals + +Once support for containers is implemented, Prism should become a target +for the Java Runner Validation tests, which are the current specification +for correct runner behavior. This will inform further feature developement. diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md new file mode 100644 index 000000000000..0f00e1f6d065 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -0,0 +1,62 @@ + + +# Prism internal packages + +Go has a mechanism for ["internal" packages](https://go.dev/doc/go1.4#internalpackages) +to prevent use of implementation details outside of their intended use. + +This mechanism is used thoroughly for Prism to ensure we can make changes to the +runner's internals without worrying about the exposed surface changes breaking +non-compliant users. + +# Structure + +Here's a loose description of the current structure of the runner. Leaf packages should +not depend on other parts of the runner. Runner packages can and do depend on other +parts of the SDK, such as for Coder handling. + +`config` contains configuration parsing and handling. Leaf package. +Handler configurations are registered by dependant packages. + +`urns` contains beam URN strings pulled from the protos. Leaf package. + +`engine` contains the core manager for handling elements, watermarks, and windowing strategies. +Determines bundle readiness, and stages to execute. Leaf package. + +`jobservices` contains GRPC service handlers for job management and submission. +Should only depend on the `config` and `urns` packages. + +`worker` contains interactions with FnAPI services to communicate with worker SDKs. Leaf package +except for dependency on `engine.TentativeData` which will likely be removed at some point. + +`internal` AKA the package in this directory root. Contains fhe job execution +flow. Jobs are sent to it from `jobservices`, and those jobs are then executed by coordinating +with the `engine` and `worker` packages, and handlers urn. +Most configurable behavior is determined here. + +# Testing + +The sub packages should have reasonable Unit Test coverage in their own directories, but +most features will be exercised via executing pipelines in this package. + +For the time being test DoFns should be added to standard build in order to validate execution +coverage, in particular for Combine and Splittable DoFns. + +Eventually these behaviors should be covered by using Prism in the main SDK tests.