Skip to content

Commit

Permalink
Tour of Beam: formatting/typos (#33249)
Browse files Browse the repository at this point in the history
* common transforms

* core transforms
  • Loading branch information
thread-sleep authored Dec 3, 2024
1 parent 95a0845 commit 697a843
Show file tree
Hide file tree
Showing 17 changed files with 40 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ PCollection<KV<Integer, Integer>> input = pipeline.apply(
And replace `Count.globally` with `Count.perKey` it will output the count numbers by key. It is also necessary to replace the generic type:

```
PCollection<KV<Integer, Integer>> output = applyTransform(input);
PCollection<KV<Integer, Long>> output = applyTransform(input);
```

```
static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) {
static PCollection<KV<Integer, Long>> applyTransform(PCollection<KV<Integer, Integer>> input) {
return input.apply(Count.globally());
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
```
{{end}}
{{if (eq .Sdk "java")}}
You can find the global maximum value from the `PCollection` by using `Max.doublesGlobally()`
You can find the global maximum value from the `PCollection` by using `Max.integersGlobally()`

```
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
PCollection<Double> max = input.apply(Max.doublesGlobally());
PCollection<Integer> max = input.apply(Max.integersGlobally());
```

Output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
```
{{end}}
{{if (eq .Sdk "java")}}
You can find the global minimum value from the `PCollection` by using `Min.doublesGlobally()`
You can find the global minimum value from the `PCollection` by using `Min.integersGlobally()`

```
PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
PCollection<Double> min = input.apply(Min.doublesGlobally());
PCollection<Integer> min = input.apply(Min.integersGlobally());
```

Output
Expand Down Expand Up @@ -165,7 +165,7 @@ PCollection<KV<Integer, Integer>> output = applyTransform(input);

```
static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) {
return input.apply(Sum.integersPerKey());
return input.apply(Min.integersPerKey());
}
```
{{end}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ world

### Built-in filters

The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount.
The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThan` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThan` to filter out elements of the input `PCollection` whose values are greater than the specified amount.

Other built-in filters are:

Expand All @@ -62,7 +62,7 @@ Other built-in filters are:
* Filter.equal


## Example 2: Filtering with a built-in methods
## Example 2: Filtering with built-in methods

```
// List of integers
Expand Down Expand Up @@ -404,11 +404,3 @@ func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
})
}
```

### Playground exercise

You can find the complete code of the above example using 'Filter' in the playground window, which you can run and experiment with.

Filter transform can be used with both text and numerical collection. For example, let's try filtering the input collection that contains words so that only words that start with the letter 'a' are returned.

You can also chain several filter transforms to form more complex filtering based on several simple filters or implement more complex filtering logic within a single filter transform. For example, try both approaches to filter the same list of words such that only ones that start with a letter 'a' (regardless of the case) and containing more than three symbols are returned.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) {
```
{{end}}
{{if (eq .Sdk "java")}}
While `ParDo` always outputs the main output of `PCollection` (as a return value from apply), you can also force your `ParDo` to output any number of additional `PCollection` outputs. If you decide to have multiple outputs, your `ParDo` will return all the `PCollection` output (including the main output) combined. This will be useful when you are working with big data or a database that needs to be divided into different collections. You get a combined `PCollectionTuple`, you can use `TupleTag` to get a `PCollection`.
While `ParDo` always outputs the main output of `PCollection` (as a return value from apply), you can also force your `ParDo` to output any number of additional `PCollection` outputs. If you decide to have multiple outputs, your `ParDo` will return all the `PCollection` outputs (including the main output) combined. This will be useful when you are working with big data or a database that needs to be divided into different collections. You get a combined `PCollectionTuple`, you can use `TupleTag` to get a `PCollection`.

A `PCollectionTuple` is an immutable tuple of heterogeneously typed `PCollection`, "with keys" `TupleTags`. A `PCollectionTuple` can be used as input or output for `PTransform` receiving or creating multiple `PCollection` inputs or outputs, which can be of different types, for example, `ParDo` with multiple outputs.

Expand Down Expand Up @@ -202,7 +202,7 @@ tens = results[None] # the undeclared main output

You can find the full code of this example in the playground window, which you can run and experiment with.

The `applyTransform()` accepts a list of integers at the output two `PCollection` one `PCollection` above 100 and second below 100.
The `applyTransform()` accepts a list of integers and outputs two `PCollections`: one `PCollection` above 100 and second below 100.

You can also work with strings:
{{if (eq .Sdk "go")}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ starts_with_b = input | beam.Filter(lambda x: x.startswith('B'))

You can find the full code of this example in the playground window, which you can run and experiment with.

Accepts a `PCollection` consisting of strings. Without modification, it returns a new "PCollection". In this case, one `PCollection` includes elements in uppercase. The other `PCollection' stores inverted elements.
Accepts a `PCollection` consisting of strings. Without modification, it returns a new `PCollection`. In this case, one `PCollection` includes elements in uppercase. The other `PCollection` stores inverted elements.

You can use a different method of branching. Since `applyTransforms` performs 2 conversions, it takes a lot of time. It is possible to convert `PCollection` separately.
{{if (eq .Sdk "go")}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ limitations under the License.

# CombinePerKey

CombinePerKey is a transform in Apache Beam that applies a `CombineFn` function to each key of a PCollection of key-value pairs. The `CombineFn` function can be used to aggregate, sum, or combine the values associated with each key in the input `PCollection`.
`CombinePerKey` is a transform in Apache Beam that applies a `CombineFn` function to each key of a `PCollection` of key-value pairs. The `CombineFn` function can be used to aggregate, sum, or combine the values associated with each key in the input `PCollection`.

The `CombinePerKey` transform takes in an instance of a `CombineFn` class and applies it to the input `PCollection`. The output of the transform is a new PCollection where each element is a key-value pair, where the key is the same as the input key, and the value is the result of applying the `CombineFn` function to all the values associated with that key in the input `PCollection`.
The `CombinePerKey` transform takes in an instance of a `CombineFn` class and applies it to the input `PCollection`. The output of the transform is a new `PCollection` where each element is a key-value pair, where the key is the same as the input key, and the value is the result of applying the `CombineFn` function to all the values associated with that key in the input `PCollection`.

{{if (eq .Sdk "go")}}
```
Expand Down Expand Up @@ -81,7 +81,7 @@ func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
{{if (eq .Sdk "java")}}
```
PCollection<KV<String, Integer>> input = pipeline
.apply("ParseCitiesToTimeKV", Create.of(
.apply(Create.of(
KV.of("a", "apple"),
KV.of("o", "orange"),
KV.of("a", "avocado),
Expand All @@ -93,7 +93,7 @@ static PCollection<KV<String, String>> applyTransform(PCollection<KV<String, Str
return input.apply(Combine.perKey(new SumStringBinaryCombineFn()));
}
static class SumIntBinaryCombineFn extends BinaryCombineFn<String> {
static class SumStringBinaryCombineFn extends BinaryCombineFn<String> {
@Override
public String apply(String left, String right) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ limitations under the License.

# Combine

`Combine` is a Beam transform for combining collections of elements or values in your data. Combine has variants that work on entire PCollections, and some that combine the values for each key in `PCollections` of **key/value** pairs.
`Combine` is a Beam transform for combining collections of elements or values in your data. Combine has variants that work on entire `PCollections`, and some that combine the values for each key in `PCollections` of **key/value** pairs.

When you apply a `Combine` transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func extractWords(s beam.Scope, input beam.PCollection) beam.PCollection {
}
```

You can use other transformations you can replace `Count` with `Filter` to output words starting with **p**:
You can use other transformations, i.e. you can replace `Count` with `Filter` to output words starting with **p**:

```
func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
Expand Down Expand Up @@ -224,7 +224,7 @@ PCollection<String> words = input
}));
```

You can use other transformations you can replace `Count` with `Filter` to output words starting with **p**:
You can use other transformations, i.e. you can replace `Count` with `Filter` to output words starting with **p**:

```
PCollection<String> filtered = input
Expand Down Expand Up @@ -252,7 +252,7 @@ PCollection<String> filtered = input
words = input | 'ExtractWords' >> beam.FlatMap(lambda line: [word for word in line.split() if word])
```

You can use other transformations you can replace `Count` with `Filter` to output words starting with **p**:
You can use other transformations, i.e. you can replace `Count` with `Filter` to output words starting with **p**:

```
filtered = (input | 'ExtractNonSpaceCharacters' >> beam.FlatMap(lambda line: [word for word in line.split() if word])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ By default, the coder for the output `PCollection` is the same as the coder for

When using `Flatten` to merge `PCollection` objects that have a windowing strategy applied, all of the `PCollection` objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you’re merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use `Flatten` to merge `PCollection` objects with incompatible windows, Beam generates an IllegalStateException error when your pipeline is constructed.
If your pipeline attempts to use `Flatten` to merge `PCollection` objects with incompatible windows, Beam generates an `IllegalStateException` error when your pipeline is constructed.

### Playground exercise

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str
{{if (eq .Sdk "java")}}
You can use the `CoGroupByKey` transformation for a tuple of tables. `CoGroupByKey` groups results from all tables by similar keys in `CoGbkResults`, from which results for any particular table can be accessed using the `TupleTag` tag supplied with the source table.

For type safety, the Jav SDK requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`. You must declare a `TupleTag` for each input `PCollection` in the `KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output, `CoGroupByKey` returns a `PCollection<KV<K, CoGbkResult>>`, which groups values from all the input `PCollections` by their common keys. Each key (all of type K) will have a different `CoGbkResult`, which is a map from `TupleTag<T> to Iterable<T>`. You can access a specific collection in an `CoGbkResult` object by using the `TupleTag` that you supplied with the initial collection.
For type safety, the Java SDK requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`. You must declare a `TupleTag` for each input `PCollection` in the `KeyedPCollectionTuple` that you want to pass to `CoGroupByKey`. As output, `CoGroupByKey` returns a `PCollection<KV<K, CoGbkResult>>`, which groups values from all the input `PCollections` by their common keys. Each key (all of type K) will have a different `CoGbkResult`, which is a map from `TupleTag<T> to Iterable<T>`. You can access a specific collection in an `CoGbkResult` object by using the `TupleTag` that you supplied with the initial collection.

```
// Mock data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ limitations under the License.
-->
# GroupByKey

`GroupByKey` is a transform that is used to group elements in a `PCollection` by key. The input to `GroupByKey` is a `PCollection` of key-value pairs, where the keys are used to group the elements. The output of `GroupByKey` is a PCollection of key-value pairs, where the keys are the same as the input, and the values are lists of all the elements with that key.
`GroupByKey` is a transform that is used to group elements in a `PCollection` by key. The input to `GroupByKey` is a `PCollection` of key-value pairs, where the keys are used to group the elements. The output of `GroupByKey` is a `PCollection` of key-value pairs, where the keys are the same as the input, and the values are lists of all the elements with that key.

Let’s examine the mechanics of `GroupByKey` with a simple example case, where our data set consists of words from a text file and the line number on which they appear. We want to group together all the line numbers (values) that share the same word (key), letting us see all the places in the text where a particular word appears.

Expand All @@ -30,7 +30,7 @@ cat, 9
and, 6
```

`GroupByKey` gathers up all the values with the same key and outputs a new pair consisting of the unique key and a collection of all of the values that were associated with that key in the input collection. If we apply GroupByKey to our input collection above, the output collection would look like this:
`GroupByKey` gathers up all the values with the same key and outputs a new pair consisting of the unique key and a collection of all of the values that were associated with that key in the input collection. If we apply `GroupByKey` to our input collection above, the output collection would look like this:

```
cat, [1,5,9]
Expand Down Expand Up @@ -61,11 +61,11 @@ PCollection<KV<String, String>> input = ...;
// Apply GroupByKey to the PCollection input.
// Save the result as the PCollection reduced.
PCollection<KV<String, Iterable<String>>> reduced = mapped.apply(GroupByKey.<String, String>create());
PCollection<KV<String, Iterable<String>>> reduced = input.apply(GroupByKey.<String, String>create());
```
{{end}}
{{if (eq .Sdk "python")}}
While all SDKs have a GroupByKey transform, using GroupBy is generally more natural. The `GroupBy` transform can be parameterized by the name(s) of properties on which to group the elements of the PCollection, or a function taking the each element as input that maps to a key on which to do grouping.
While all SDKs have a `GroupByKey` transform, using `GroupBy` is generally more natural. The `GroupBy` transform can be parameterized by the name(s) of properties on which to group the elements of the `PCollection`, or a function taking the each element as input that maps to a key on which to do grouping.

```
input = ...
Expand All @@ -77,11 +77,11 @@ grouped_words = input | beam.GroupByKey()

If you are using unbounded `PCollections`, you must use either non-global windowing or an aggregation trigger in order to perform a `GroupByKey` or `CoGroupByKey`. This is because a bounded `GroupByKey` or `CoGroupByKey` must wait for all the data with a certain key to be collected, but with unbounded collections, the data is unlimited. Windowing and/or triggers allow grouping to operate on logical, finite bundles of data within the unbounded data streams.

If you do apply `GroupByKey` or `CoGroupByKey` to a group of unbounded `PCollections` without setting either a non-global windowing strategy, a trigger strategy, or both for each collection, Beam generates an IllegalStateException error at pipeline construction time.
If you do apply `GroupByKey` or `CoGroupByKey` to a group of unbounded `PCollections` without setting either a non-global windowing strategy, a trigger strategy, or both for each collection, Beam generates an `IllegalStateException` error at pipeline construction time.

When using `GroupByKey` or `CoGroupByKey` to group PCollections that have a windowing strategy applied, all of the `PCollections` you want to group must use the same windowing strategy and window sizing. For example, all the collections you are merging must use (hypothetically) identical 5-minute fixed windows, or 4-minute sliding windows starting every 30 seconds.
When using `GroupByKey` or `CoGroupByKey` to group `PCollections` that have a windowing strategy applied, all of the `PCollections` you want to group must use the same windowing strategy and window sizing. For example, all the collections you are merging must use (hypothetically) identical 5-minute fixed windows, or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge `PCollections` with incompatible windows, Beam generates an IllegalStateException error at pipeline construction time.
If your pipeline attempts to use `GroupByKey` or `CoGroupByKey` to merge `PCollections` with incompatible windows, Beam generates an `IllegalStateException` error at pipeline construction time.


### Playground exercise
Expand Down Expand Up @@ -118,7 +118,7 @@ func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection {
{{if (eq .Sdk "java")}}
```
PCollection<KV<String, Integer>> input = pipeline
.apply("ParseCitiesToTimeKV", Create.of(
.apply(Create.of(
KV.of("banana", 2),
KV.of("apple", 4),
KV.of("lemon", 3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ PCollection<Integer> wordLengths = input.apply(
}));
```

If your `ParDo` performs a one-to-one mapping of input elements to output elements–that is, for each input element, it applies a function that produces exactly one output element, you can use the higher-level `MapElements` transform.MapElements can accept an anonymous Java 8 lambda function for additional brevity.
If your `ParDo` performs a one-to-one mapping of input elements to output elements–that is, for each input element, it applies a function that produces exactly one output element, you can use the higher-level `MapElements` transform. `MapElements` can accept an anonymous Java 8 lambda function for additional brevity.

Here’s the previous example using `MapElements` :

Expand Down
Loading

0 comments on commit 697a843

Please sign in to comment.