From c9255fa7c8569f86896090a28e77bf8ae5c861d4 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 Oct 2023 17:51:01 -0700 Subject: [PATCH] [YAML] Improved pipeline schema definition. Slightly stricter definitions for catching more errors, as well as avoding the use of anyOf which often makes it difficult to deduce what the true error is. This does mean a pipeline must have a transform (or source/sink) block rather than simply be itself a list of transforms. --- sdks/python/apache_beam/yaml/README.md | 177 +++++++++--------- .../apache_beam/yaml/pipeline.schema.yaml | 48 ++++- 2 files changed, 128 insertions(+), 97 deletions(-) diff --git a/sdks/python/apache_beam/yaml/README.md b/sdks/python/apache_beam/yaml/README.md index 3ba78784c997..62c0d0eea162 100644 --- a/sdks/python/apache_beam/yaml/README.md +++ b/sdks/python/apache_beam/yaml/README.md @@ -166,41 +166,42 @@ Here we read two sources, join them, and write two outputs. ``` pipeline: - - type: ReadFromCsv - name: ReadLeft - config: - path: /path/to/left*.csv + transforms: + - type: ReadFromCsv + name: ReadLeft + config: + path: /path/to/left*.csv - - type: ReadFromCsv - name: ReadRight - config: - path: /path/to/right*.csv + - type: ReadFromCsv + name: ReadRight + config: + path: /path/to/right*.csv - - type: Sql - config: - query: select left.col1, right.col2 from left join right using (col3) - input: - left: ReadLeft - right: ReadRight - - - type: WriteToJson - name: WriteAll - input: Sql - config: - path: /path/to/all.json + - type: Sql + config: + query: select left.col1, right.col2 from left join right using (col3) + input: + left: ReadLeft + right: ReadRight - - type: Filter - name: FilterToBig - input: Sql - config: - language: python - keep: "col2 > 100" + - type: WriteToJson + name: WriteAll + input: Sql + config: + path: /path/to/all.json - - type: WriteToCsv - name: WriteBig - input: FilterToBig - config: - path: /path/to/big.csv + - type: Filter + name: FilterToBig + input: Sql + config: + language: python + keep: "col2 > 100" + + - type: WriteToCsv + name: WriteBig + input: FilterToBig + config: + path: /path/to/big.csv ``` One can, however, nest `chains` within a non-linear pipeline. @@ -209,49 +210,50 @@ that has a single input and contains its own sink. ``` pipeline: - - type: ReadFromCsv - name: ReadLeft - config: - path: /path/to/left*.csv + transforms: + - type: ReadFromCsv + name: ReadLeft + config: + path: /path/to/left*.csv - - type: ReadFromCsv - name: ReadRight - config: - path: /path/to/right*.csv + - type: ReadFromCsv + name: ReadRight + config: + path: /path/to/right*.csv - - type: Sql - config: - query: select left.col1, right.col2 from left join right using (col3) - input: - left: ReadLeft - right: ReadRight - - - type: WriteToJson - name: WriteAll - input: Sql - config: - path: /path/to/all.json + - type: Sql + config: + query: select left.col1, right.col2 from left join right using (col3) + input: + left: ReadLeft + right: ReadRight - - type: chain - name: ExtraProcessingForBigRows - input: Sql - transforms: - - type: Filter - config: - language: python - keep: "col2 > 100" - - type: Filter - config: - language: python - keep: "len(col1) > 10" - - type: Filter - config: - language: python - keep: "col1 > 'z'" - sink: - type: WriteToCsv + - type: WriteToJson + name: WriteAll + input: Sql config: - path: /path/to/big.csv + path: /path/to/all.json + + - type: chain + name: ExtraProcessingForBigRows + input: Sql + transforms: + - type: Filter + config: + language: python + keep: "col2 > 100" + - type: Filter + config: + language: python + keep: "len(col1) > 10" + - type: Filter + config: + language: python + keep: "col1 > 'z'" + sink: + type: WriteToCsv + config: + path: /path/to/big.csv ``` ## Windowing @@ -329,25 +331,26 @@ a join per window. ``` pipeline: - - type: ReadFromPubSub - name: ReadLeft - config: - topic: leftTopic + transforms: + - type: ReadFromPubSub + name: ReadLeft + config: + topic: leftTopic - - type: ReadFromPubSub - name: ReadRight - config: - topic: rightTopic + - type: ReadFromPubSub + name: ReadRight + config: + topic: rightTopic - - type: Sql - config: - query: select left.col1, right.col2 from left join right using (col3) - input: - left: ReadLeft - right: ReadRight - windowing: - type: fixed - size: 60 + - type: Sql + config: + query: select left.col1, right.col2 from left join right using (col3) + input: + left: ReadLeft + right: ReadRight + windowing: + type: fixed + size: 60 ``` For a transform with no inputs, the specified windowing is instead applied to diff --git a/sdks/python/apache_beam/yaml/pipeline.schema.yaml b/sdks/python/apache_beam/yaml/pipeline.schema.yaml index ef0d9fe0f262..e784531d9be1 100644 --- a/sdks/python/apache_beam/yaml/pipeline.schema.yaml +++ b/sdks/python/apache_beam/yaml/pipeline.schema.yaml @@ -15,7 +15,7 @@ # limitations under the License. # -$schema: 'http://json-schema.org/schema#' +$schema: 'http://json-schema.org/draft-07/schema#' $id: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml/pipeline.schema.yaml $defs: @@ -115,6 +115,23 @@ $defs: - $ref: '#/$defs/nestedTransform' - $ref: '#/$defs/implicitInputOutputs' + - if: + not: + anyOf: + - properties: { type: { const: composite }} + - properties: { type: { const: chain }} + then: + properties: + type: {} + name: {} + input: {} + output: {} + windowing: {} + config: { type: object } + __line__: {} + __uuid__: {} + additionalProperties: false + windowing: {} # TODO provider: @@ -128,27 +145,38 @@ $defs: properties: { __line__: {}} additionalProperties: type: string + config: { type: object } + additionalProperties: false required: - type - transforms + - config type: object properties: pipeline: - anyOf: - - type: array - items: - $ref: '#/$defs/transform' - - $ref: '#/$defs/transform' + allOf: + # These are the only top-level properties defined in pipeline. - type: object properties: - transforms: - type: array - items: - $ref: '#/$defs/transform' + type: { const: chain } + windowing: + $ref: '#/$defs/windowing' + transforms: {} + extra_transforms: {} + sink: {} + source: {} __line__: {} __uuid__: {} additionalProperties: false + # This defines the allowable contents of the attributes above. + - $ref: '#/$defs/nestedTransform' + # A chain-type transform, like a chain composite, must have implicit io. + - if: + properties: { type: { const: chain }} + required: [type] + then: + $ref: '#/$defs/implicitInputOutputs' providers: type: array items: