From 77847489c97b883a07aac5539cc8a32f0834e90e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 9 Feb 2023 11:13:19 -0800 Subject: [PATCH 1/2] [prism] Initial commit for READMEs and go.mod --- sdks/go.mod | 17 +- sdks/go.sum | 8 +- sdks/go/pkg/beam/runners/prism/README.md | 171 ++++++++++++++++++ .../pkg/beam/runners/prism/internal/README.md | 43 +++++ 4 files changed, 226 insertions(+), 13 deletions(-) create mode 100644 sdks/go/pkg/beam/runners/prism/README.md create mode 100644 sdks/go/pkg/beam/runners/prism/internal/README.md diff --git a/sdks/go.mod b/sdks/go.mod index 96446993b352..5185da0f7240 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -24,9 +24,11 @@ go 1.18 require ( cloud.google.com/go/bigquery v1.45.0 + cloud.google.com/go/bigtable v1.18.1 cloud.google.com/go/datastore v1.10.0 cloud.google.com/go/profiler v0.3.1 cloud.google.com/go/pubsub v1.28.0 + cloud.google.com/go/spanner v1.43.0 cloud.google.com/go/storage v1.29.0 github.com/aws/aws-sdk-go-v2 v1.17.3 github.com/aws/aws-sdk-go-v2/config v1.18.11 @@ -46,9 +48,11 @@ require ( github.com/proullon/ramsql v0.0.0-20211120092837-c8d0a408b939 github.com/spf13/cobra v1.6.1 github.com/testcontainers/testcontainers-go v0.15.0 + github.com/tetratelabs/wazero v1.0.0-pre.7 github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c go.mongodb.org/mongo-driver v1.11.1 + golang.org/x/exp v0.0.0-20230206171751-46f607a40771 golang.org/x/net v0.5.0 golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 golang.org/x/sync v0.1.0 @@ -60,13 +64,7 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/retry.v1 v1.0.3 gopkg.in/yaml.v2 v2.4.0 -) - -require cloud.google.com/go/spanner v1.43.0 - -require ( - cloud.google.com/go/bigtable v1.18.1 - github.com/tetratelabs/wazero v1.0.0-pre.7 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -136,9 +134,8 @@ require ( github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) +) \ No newline at end of file diff --git a/sdks/go.sum b/sdks/go.sum index 55bf439e72b5..d77c5c633e00 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -911,6 +911,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg= +golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1137,8 +1139,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200916195026-c9a70fc28ce3/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1324,4 +1326,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.0.3/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= -sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= +sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= \ No newline at end of file diff --git a/sdks/go/pkg/beam/runners/prism/README.md b/sdks/go/pkg/beam/runners/prism/README.md new file mode 100644 index 000000000000..539d4678bc0b --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/README.md @@ -0,0 +1,171 @@ +# Apache Beam Go Prism Runner + +Prism is a local portable Apache Beam runner authored in Go. + +* Local, for fast startup and ease of testing on a single machine. +* Portable, in that it uses the Beam FnAPI to communicate with Beam SDKs of any language. +* Go simple concurrency enables clear structures for testing batch through streaming jobs. + +It's intended to replace the current Go Direct runner, but also be for general +single machine use. + +For Go SDK users: + - Short term: set runner to "prism" to use it, or invoke directly. + - Medium term: switch the default from "direct" to "prism". + - Long term: alias "direct" to "prism", and delete legacy Go direct runner. + +Prisms allow breaking apart and separating a beam of light into +it's component wavelengths, as well as recombining them together. + +The Prism Runner leans on this metaphor with the goal of making it +easier for users and Beam SDK developers alike to test and validate +aspects of Beam that are presently under represented. + +## Configurability + +Prism is configurable using YAML, which is eagerly validated on startup. +The configuration contains a set of variants to specify execution behavior, +either to support specific testing goals, or to emulate different runners. + +Beam's implementation contains a number of details that are hidden from +users, and to date, no runner implements the same set of features. This +can make SDK or pipeline development difficult, since exactly what is +being tested will vary on the runner being used. + +At the top level the configuration contains "variants", and the variants +configure the behaviors of different "handlers" in Prism. + +Jobs will be able to provide a pipeline option to select which variant to +use. Multiple jobs on the same prism instance can use different variants. +Jobs which don't provide a variant will default to testing behavior. + +All variants should execute the Beam Model faithfully and correctly, +and with few exceptions it should not be possible for there to be an +invalid execution. The machine's the limit. + +It's not expected that all handler options are useful for pipeline authors, +These options should remain useful for SDK developers, +or more precise issue reproduction. + +For more detail on the motivation, see Robert Burke's (@lostluck) Beam Summit 2022 talk: +https://2022.beamsummit.org/sessions/portable-go-beam-runner/. + +Here's a non-exhaustive set of variants. + +### Variant Highlight: "default" + +The "default" variant is testing focused, intending to route out issues at development +time, rather than discovering them on production runners. Notably, this mode should +never use fusion, executing each Transform individually and independantly, one at a time. + +This variant should be able to execute arbitrary pipelines, correctly, with clarity and +precision when an error occurs. Other features supported by the SDK should be enabled by default to +ensure good coverage, such as caches, or RPC reductions like sending elements in +ProcessBundleRequest and Response, as they should not affect correctness. Composite +transforms like Splitable DoFns and Combines should be expanded to ensure coverage. + +Additional validations may be added as time goes on. + +Does not retry or provide other resilience features, which may mask errors. + +To ensure coverage, there may be sibling variants that use mutually exclusive alternative +executions. + +### Variant Highlight: "fast" + +Not Yet Implemented - Illustrative goal. + +The "fast" variant is performance focused, intended for local scale execution. +A psuedo production execution. Fusion optimizations should be performed. +Large PCollection should be offloaded to persistent disk. Bundles should be +dynamically split. Multiple Bundles should be executed simultaneously. And so on. + +Pipelines should execute as swiftly as possible within the bounds of correct +execution. + +### Variant Hightlight: "flink" "dataflow" "spark" AKA Emulations + +Not Yet Implemented - Illustrative goal. + +Emulation variants have the goal of replicating on the local scale, +the behaviors of other runners. Flink execution never "lifts" Combines, and +doesn't dynamically split. Dataflow has different characteristics for batch +and streaming execution with certain execution charateristics enabled or +disabled. + +As Prism is intended to implement all facets of Beam Model execution, the handlers +can have features selectively disabled to ensure + +## Current Limitations + +* Experimental and testing use only. +* Executing docker containers isn't yet implemented. + * This precludes running the Java and Python SDKs, or their transforms for Cross Language. + * Loopback execution only. + * No stand alone execution. +* In Memory Only + * Not yet suitable for larger jobs, which may have intermediate data that exceeds memory bounds. + * Doesn't yet support sufficient intermediate data garbage collection for indefinite stream processing. +* Doesn't yet execute all beam pipeline features. +* No UI for job status inspection. + +## Implemented so far. + +* DoFns + * Side Inputs + * Multiple Outputs +* Flattens +* GBKs + * Includes handling session windows. + * Global Window + * Interval Windowing + * Session Windows. +* Combines lifted and unlifted. +* Expands Splittable DoFns +* Limited support for Process Continuations + * Residuals are rescheduled for execution immeadiately. + * The transform must be finite (and eventually return a stop process continuation) +* Basic Metrics support + +## Next feature short list (unordered) + +See https://github.com/apache/beam/issues/24789 for current status. + +* Resolve watermark advancement for Process Continuations +* Test Stream +* Triggers & Complex Windowing Strategy execution. +* State +* Timers +* "PubSub" Transform +* Support SDK Containers via Testcontainers + * Cross Language Transforms +* FnAPI Optimizations + * Fusion + * Data with ProcessBundleRequest & Response +* Progess tracking + * Channel Splitting + * Dynamic Splitting +* Stand alone execution support +* UI reporting of in progress jobs + +This is not a comprehensive feature set, but a set of goals to best +support users of the Go SDK in testing their pipelines. + +## How to contribute + +Until additional structure is necessary, check the main issue +https://github.com/apache/beam/issues/24789 for the current +status, file an issue for the feature or bug to fix with `[prism]` +in the title, and refer to the main issue, before begining work +to avoid duplication of effort. + +If a feature will take a long time, please send a PR to +link to your issue from this README to help others discover it. + +Otherwise, ordinary [Beam contribution guidelines apply](https://beam.apache.org/contribute/). + +# Long Term Goals + +Once support for containers is implemented, Prism should become a target +for the Java Runner Validation tests, which are the current specification +for correct runner behavior. This will inform further feature developement. diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md new file mode 100644 index 000000000000..b394ce04c87d --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -0,0 +1,43 @@ +# Prism internal packages + +Go has a mechanism for ["internal" packages](https://go.dev/doc/go1.4#internalpackages) +to prevent use of implementation details outside of their intended use. + +This mechanism is used thoroughly for Prism to ensure we can make changes to the +runner's internals without worrying about the exposed surface changes breaking +non-compliant users. + +# Structure + +Here's a loose description of the current structure of the runner. Leaf packages should +not depend on other parts of the runner. Runner packages can and do depend on other +parts of the SDK, such as for Coder handling. + +`config` contains configuration parsing and handling. Leaf package. +Handler configurations are registered by dependant packages. + +`urns` contains beam URN strings pulled from the protos. Leaf package. + +`engine` contains the core manager for handling elements, watermarks, and windowing strategies. +Determines bundle readiness, and stages to execute. Leaf package. + +`jobservices` contains GRPC service handlers for job management and submission. +Should only depend on the `config` and `urns` packages. + +`worker` contains interactions with FnAPI services to communicate with worker SDKs. Leaf package +except for dependency on `engine.TentativeData` which will likely be removed at some point. + +`internal` AKA the package in this directory root. Contains fhe job execution +flow. Jobs are sent to it from `jobservices`, and those jobs are then executed by coordinating +with the `engine` and `worker` packages, and handlers urn. +Most configurable behavior is determined here. + +# Testing + +The sub packages should have reasonable Unit Test coverage in their own directories, but +most features will be exercised via executing pipelines in this package. + +For the time being test DoFns should be added to standard build in order to validate execution +coverage, in particular for Combine and Splittable DoFns. + +Eventually these behaviors should be covered by using Prism in the main SDK tests. From 15e00c7527a160e63beccdab0709be68a0781c4c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 9 Feb 2023 11:50:29 -0800 Subject: [PATCH 2/2] [prism] ws lint & license --- sdks/go/pkg/beam/runners/prism/README.md | 47 +++++++++++++------ .../pkg/beam/runners/prism/internal/README.md | 25 ++++++++-- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/README.md b/sdks/go/pkg/beam/runners/prism/README.md index 539d4678bc0b..1e91a3d64f8f 100644 --- a/sdks/go/pkg/beam/runners/prism/README.md +++ b/sdks/go/pkg/beam/runners/prism/README.md @@ -1,3 +1,22 @@ + + # Apache Beam Go Prism Runner Prism is a local portable Apache Beam runner authored in Go. @@ -11,7 +30,7 @@ single machine use. For Go SDK users: - Short term: set runner to "prism" to use it, or invoke directly. - - Medium term: switch the default from "direct" to "prism". + - Medium term: switch the default from "direct" to "prism". - Long term: alias "direct" to "prism", and delete legacy Go direct runner. Prisms allow breaking apart and separating a beam of light into @@ -24,26 +43,26 @@ aspects of Beam that are presently under represented. ## Configurability Prism is configurable using YAML, which is eagerly validated on startup. -The configuration contains a set of variants to specify execution behavior, +The configuration contains a set of variants to specify execution behavior, either to support specific testing goals, or to emulate different runners. Beam's implementation contains a number of details that are hidden from users, and to date, no runner implements the same set of features. This can make SDK or pipeline development difficult, since exactly what is -being tested will vary on the runner being used. +being tested will vary on the runner being used. At the top level the configuration contains "variants", and the variants -configure the behaviors of different "handlers" in Prism. +configure the behaviors of different "handlers" in Prism. Jobs will be able to provide a pipeline option to select which variant to use. Multiple jobs on the same prism instance can use different variants. Jobs which don't provide a variant will default to testing behavior. -All variants should execute the Beam Model faithfully and correctly, +All variants should execute the Beam Model faithfully and correctly, and with few exceptions it should not be possible for there to be an invalid execution. The machine's the limit. -It's not expected that all handler options are useful for pipeline authors, +It's not expected that all handler options are useful for pipeline authors, These options should remain useful for SDK developers, or more precise issue reproduction. @@ -55,18 +74,18 @@ Here's a non-exhaustive set of variants. ### Variant Highlight: "default" The "default" variant is testing focused, intending to route out issues at development -time, rather than discovering them on production runners. Notably, this mode should +time, rather than discovering them on production runners. Notably, this mode should never use fusion, executing each Transform individually and independantly, one at a time. This variant should be able to execute arbitrary pipelines, correctly, with clarity and precision when an error occurs. Other features supported by the SDK should be enabled by default to -ensure good coverage, such as caches, or RPC reductions like sending elements in +ensure good coverage, such as caches, or RPC reductions like sending elements in ProcessBundleRequest and Response, as they should not affect correctness. Composite transforms like Splitable DoFns and Combines should be expanded to ensure coverage. Additional validations may be added as time goes on. -Does not retry or provide other resilience features, which may mask errors. +Does not retry or provide other resilience features, which may mask errors. To ensure coverage, there may be sibling variants that use mutually exclusive alternative executions. @@ -76,8 +95,8 @@ executions. Not Yet Implemented - Illustrative goal. The "fast" variant is performance focused, intended for local scale execution. -A psuedo production execution. Fusion optimizations should be performed. -Large PCollection should be offloaded to persistent disk. Bundles should be +A psuedo production execution. Fusion optimizations should be performed. +Large PCollection should be offloaded to persistent disk. Bundles should be dynamically split. Multiple Bundles should be executed simultaneously. And so on. Pipelines should execute as swiftly as possible within the bounds of correct @@ -94,7 +113,7 @@ and streaming execution with certain execution charateristics enabled or disabled. As Prism is intended to implement all facets of Beam Model execution, the handlers -can have features selectively disabled to ensure +can have features selectively disabled to ensure ## Current Limitations @@ -117,7 +136,7 @@ can have features selectively disabled to ensure * Flattens * GBKs * Includes handling session windows. - * Global Window + * Global Window * Interval Windowing * Session Windows. * Combines lifted and unlifted. @@ -145,7 +164,7 @@ See https://github.com/apache/beam/issues/24789 for current status. * Progess tracking * Channel Splitting * Dynamic Splitting -* Stand alone execution support +* Stand alone execution support * UI reporting of in progress jobs This is not a comprehensive feature set, but a set of goals to best diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index b394ce04c87d..0f00e1f6d065 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -1,8 +1,27 @@ + + # Prism internal packages Go has a mechanism for ["internal" packages](https://go.dev/doc/go1.4#internalpackages) to prevent use of implementation details outside of their intended use. - + This mechanism is used thoroughly for Prism to ensure we can make changes to the runner's internals without worrying about the exposed surface changes breaking non-compliant users. @@ -13,7 +32,7 @@ Here's a loose description of the current structure of the runner. Leaf packages not depend on other parts of the runner. Runner packages can and do depend on other parts of the SDK, such as for Coder handling. -`config` contains configuration parsing and handling. Leaf package. +`config` contains configuration parsing and handling. Leaf package. Handler configurations are registered by dependant packages. `urns` contains beam URN strings pulled from the protos. Leaf package. @@ -27,7 +46,7 @@ Should only depend on the `config` and `urns` packages. `worker` contains interactions with FnAPI services to communicate with worker SDKs. Leaf package except for dependency on `engine.TentativeData` which will likely be removed at some point. -`internal` AKA the package in this directory root. Contains fhe job execution +`internal` AKA the package in this directory root. Contains fhe job execution flow. Jobs are sent to it from `jobservices`, and those jobs are then executed by coordinating with the `engine` and `worker` packages, and handlers urn. Most configurable behavior is determined here.