diff --git a/learning/tour-of-beam/learning-content/common-transforms/aggregation/count/go-example/main.go b/learning/tour-of-beam/learning-content/common-transforms/aggregation/count/go-example/main.go index 3c131f48121a..16ed1be3493c 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/aggregation/count/go-example/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/aggregation/count/go-example/main.go @@ -28,6 +28,7 @@ package main import ( "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" @@ -37,6 +38,7 @@ import ( func main() { ctx := context.Background() + beam.Init() p, s := beam.NewPipelineWithRoot() @@ -51,7 +53,7 @@ func main() { err := beamx.Run(ctx, p) if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) + log.Exitf(ctx, "Failed to execute job: %v", err) } } diff --git a/learning/tour-of-beam/learning-content/common-transforms/aggregation/max/go-example/main.go b/learning/tour-of-beam/learning-content/common-transforms/aggregation/max/go-example/main.go index 09f9a0564c2b..b4fb762288e5 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/aggregation/max/go-example/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/aggregation/max/go-example/main.go @@ -27,34 +27,37 @@ package main import ( - "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "context" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { - ctx := context.Background() + ctx := context.Background() + beam.Init() - p, s := beam.NewPipelineWithRoot() + p, s := beam.NewPipelineWithRoot() - // List of elements - input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + // List of elements + input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - // The applyTransform() converts [input] to [output] - output := applyTransform(s,input) + // The applyTransform() converts [input] to [output] + output := applyTransform(s, input) - debug.Printf(s, "PCollection maximum value: %v", output) + debug.Printf(s, "PCollection maximum value: %v", output) - err := beamx.Run(ctx, p) + err := beamx.Run(ctx, p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } + // Return the maximum number from `PCollection`. func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - return stats.Max(s, input) -} \ No newline at end of file + return stats.Max(s, input) +} diff --git a/learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/go-example/main.go b/learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/go-example/main.go index 2336431669bf..f281ef53c7eb 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/go-example/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/go-example/main.go @@ -27,35 +27,37 @@ package main import ( - "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "context" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { - ctx := context.Background() + ctx := context.Background() + beam.Init() - p, s := beam.NewPipelineWithRoot() + p, s := beam.NewPipelineWithRoot() - // List of elements - input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + // List of elements + input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - // The applyTransform() converts [input] to [output] - output := applyTransform(s, input) + // The applyTransform() converts [input] to [output] + output := applyTransform(s, input) - debug.Printf(s, "PCollection mean value: %v", output) + debug.Printf(s, "PCollection mean value: %v", output) - err := beamx.Run(ctx, p) + err := beamx.Run(ctx, p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } // Return the mean of numbers from `PCollection`. func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - return stats.Mean(s, input) -} \ No newline at end of file + return stats.Mean(s, input) +} diff --git a/learning/tour-of-beam/learning-content/common-transforms/aggregation/min/go-example/main.go b/learning/tour-of-beam/learning-content/common-transforms/aggregation/min/go-example/main.go index 0760c269caa7..b883549549de 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/aggregation/min/go-example/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/aggregation/min/go-example/main.go @@ -27,35 +27,37 @@ package main import ( - "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "context" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { - ctx := context.Background() + ctx := context.Background() + beam.Init() - p, s := beam.NewPipelineWithRoot() + p, s := beam.NewPipelineWithRoot() - // List of elements - input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + // List of elements + input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - // The applyTransform() converts [input] to [output] - output := applyTransform(s, input) + // The applyTransform() converts [input] to [output] + output := applyTransform(s, input) - debug.Printf(s, "PCollection minimum value: %v", output) + debug.Printf(s, "PCollection minimum value: %v", output) - err := beamx.Run(ctx, p) + err := beamx.Run(ctx, p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } // Return the minimum of numbers from `PCollection`. func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - return stats.Min(s, input) -} \ No newline at end of file + return stats.Min(s, input) +} diff --git a/learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/go-example/main.go b/learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/go-example/main.go index dbaf832b90c3..863377620ed0 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/go-example/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/go-example/main.go @@ -28,6 +28,7 @@ package main import ( "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" @@ -37,6 +38,7 @@ import ( func main() { ctx := context.Background() + beam.Init() p, s := beam.NewPipelineWithRoot() @@ -51,7 +53,7 @@ func main() { err := beamx.Run(ctx, p) if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) + log.Exitf(ctx, "Failed to execute job: %v", err) } } diff --git a/learning/tour-of-beam/learning-content/common-transforms/filter/go-example/main.go b/learning/tour-of-beam/learning-content/common-transforms/filter/go-example/main.go index ec462b5dba3c..a7a6b88391b5 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/filter/go-example/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/filter/go-example/main.go @@ -27,39 +27,39 @@ package main import ( - "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" + "context" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { - ctx := context.Background() + ctx := context.Background() + beam.Init() - p, s := beam.NewPipelineWithRoot() + p, s := beam.NewPipelineWithRoot() - // List of elements - input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + // List of elements + input := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - // The [input] filtered with the applyTransform() - output := applyTransform(s, input) + // The [input] filtered with the applyTransform() + output := applyTransform(s, input) - debug.Printf(s, "PCollection filtered value: %v", output) + debug.Printf(s, "PCollection filtered value: %v", output) - err := beamx.Run(ctx, p) + err := beamx.Run(ctx, p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } // The method filters the collection so that the numbers are even func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - return filter.Exclude(s, input, func(element int) bool { - return element % 2 == 1 - }) + return filter.Exclude(s, input, func(element int) bool { + return element%2 == 1 + }) } - - diff --git a/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-challenge/main.go b/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-challenge/main.go index d90b50e35d3c..5c3afba4e0c3 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-challenge/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-challenge/main.go @@ -28,6 +28,9 @@ package main import ( "context" + "strconv" + "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" @@ -35,12 +38,11 @@ import ( _ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" - "strconv" - "strings" ) func main() { ctx := context.Background() + beam.Init() p, s := beam.NewPipelineWithRoot() @@ -71,7 +73,7 @@ func main() { err := beamx.Run(ctx, p) if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) + log.Exitf(ctx, "Failed to execute job: %v", err) } } diff --git a/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-solution/main.go b/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-solution/main.go index 84a64eb030d7..a1bb26689a8d 100644 --- a/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-solution/main.go +++ b/learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/go-solution/main.go @@ -28,6 +28,9 @@ package main import ( "context" + "strconv" + "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" @@ -35,12 +38,11 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" - "strconv" - "strings" ) func main() { ctx := context.Background() + beam.Init() p, s := beam.NewPipelineWithRoot() @@ -73,7 +75,7 @@ func main() { err := beamx.Run(ctx, p) if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) + log.Exitf(ctx, "Failed to execute job: %v", err) } } diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/from-memory/go-example/from_memory.go b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/from-memory/go-example/from_memory.go index 5ed38c2c8331..fef21ea98398 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/from-memory/go-example/from_memory.go +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/from-memory/go-example/from_memory.go @@ -27,28 +27,32 @@ package main import ( - "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "fmt" + "context" + "fmt" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func main() { - p, s := beam.NewPipelineWithRoot() + ctx := context.Background() + beam.Init() + + p, s := beam.NewPipelineWithRoot() - words := beam.Create(s, "Hello", "world", "it`s", "Beam") + words := beam.Create(s, "Hello", "world", "it`s", "Beam") - output(s, words) + output(s, words) - err := beamx.Run(context.Background(), p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + err := beamx.Run(ctx, p) + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } func output(s beam.Scope, input beam.PCollection) { - beam.ParDo0(s, func(element interface{}) { - fmt.Println(element) - }, input) -} \ No newline at end of file + beam.ParDo0(s, func(element string) { + fmt.Println(element) + }, input) +} diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-csv/go-example/csvExample.go b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-csv/go-example/csvExample.go index 1967afc14381..42c527f5c81d 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-csv/go-example/csvExample.go +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-csv/go-example/csvExample.go @@ -30,60 +30,63 @@ package main import ( - "context" - "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "strconv" - "strings" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "context" + "fmt" + "strconv" + "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) -func less(a, b float64) bool{ - return a>b +func less(a, b float64) bool { + return a < b } func main() { - p, s := beam.NewPipelineWithRoot() + ctx := context.Background() + beam.Init() - input := Read(s, "gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv") + p, s := beam.NewPipelineWithRoot() - cost := applyTransform(s, input) + input := Read(s, "gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv") - fixedSizeElements := top.Largest(s,cost,10,less) + cost := applyTransform(s, input) - output(s, "Total cost: ", fixedSizeElements) + fixedSizeElements := top.Largest(s, cost, 10, less) - err := beamx.Run(context.Background(), p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) + output(s, "Total cost: ", fixedSizeElements) + + err := beamx.Run(ctx, p) + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) } } -// Read reads from fiename(s) specified by a glob string and a returns a PCollection. +// Read reads from filename(s) specified by a glob string and a returns a PCollection. func Read(s beam.Scope, glob string) beam.PCollection { - return textio.Read(s, glob) + return textio.Read(s, glob) } -// ApplyTransform converts to uppercase all elements in a PCollection. +// ApplyTransform converts to float total_amount from all the elements in a PCollection. func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { - return beam.ParDo(s, func(line string) float64 { - taxi := strings.Split(strings.TrimSpace(line), ",") - if len(taxi) > 16 { - cost, _ := strconv.ParseFloat(taxi[16],64) - return cost - } - return 0.0 - }, input) + return beam.ParDo(s, func(line string) float64 { + taxi := strings.Split(strings.TrimSpace(line), ",") + if len(taxi) > 16 { + cost, _ := strconv.ParseFloat(taxi[16], 64) + return cost + } + return 0.0 + }, input) } func output(s beam.Scope, prefix string, input beam.PCollection) { - beam.ParDo0(s, func(elements []float64) { - for _, element := range elements { - fmt.Println(prefix,element) - } - }, input) + beam.ParDo0(s, func(elements []float64) { + for _, element := range elements { + fmt.Println(prefix, element) + } + }, input) } diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-text/go-example/textIo.go b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-text/go-example/textIo.go index 3a59f2a95807..f130e2a53700 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-text/go-example/textIo.go +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/creating-collections/reading-from-text/go-example/textIo.go @@ -29,69 +29,74 @@ package main import ( - "context" - "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "regexp" - "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "context" + "fmt" + "regexp" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) var ( - wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) ) -func less(a, b string) bool{ - return len(a)>len(b) +func less(a, b string) bool { + return len(a) < len(b) } func main() { - p, s := beam.NewPipelineWithRoot() + ctx := context.Background() + beam.Init() + + p, s := beam.NewPipelineWithRoot() - input := Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") + input := Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") - lines := getLines(s, input) - fixedSizeLines := top.Largest(s,lines,10,less) - output(s, "Lines: ", fixedSizeLines) + lines := getLines(s, input) + fixedSizeLines := top.Largest(s, lines, 10, less) + output(s, "Lines: ", fixedSizeLines) - words := getWords(s,lines) - fixedSizeWords := top.Largest(s,words,10,less) - output(s, "Words: ", fixedSizeWords) + words := getWords(s, lines) + fixedSizeWords := top.Largest(s, words, 10, less) + output(s, "Words: ", fixedSizeWords) - err := beamx.Run(context.Background(), p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + err := beamx.Run(ctx, p) + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } -// Read reads from fiename(s) specified by a glob string and a returns a PCollection. +// Read reads from filename(s) specified by a glob string and a returns a PCollection. func Read(s beam.Scope, glob string) beam.PCollection { - return textio.Read(s, glob) + return textio.Read(s, glob) } // Read text file content line by line. resulting PCollection contains elements, where each element contains a single line of text from the input file. func getLines(s beam.Scope, input beam.PCollection) beam.PCollection { - return filter.Include(s, input, func(element string) bool { - return element != "" - }) + return filter.Include(s, input, func(element string) bool { + return element != "" + }) } // getWords read text lines and split into PCollection of words. func getWords(s beam.Scope, input beam.PCollection) beam.PCollection { - return beam.ParDo(s, func(line string, emit func(string)) { - for _, word := range wordRE.FindAllString(line, -1) { - emit(word) - } - }, input) + return beam.ParDo(s, func(line string, emit func(string)) { + for _, word := range wordRE.FindAllString(line, -1) { + emit(word) + } + }, input) } func output(s beam.Scope, prefix string, input beam.PCollection) { - beam.ParDo0(s, func(elements []string) { - for _, element := range elements { - fmt.Println(prefix,element) - } - }, input) + beam.ParDo0(s, func(elements []string) { + for _, element := range elements { + fmt.Println(prefix, element) + } + }, input) } diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/go-example/main.go b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/go-example/main.go index d86e9881fc1b..c894bf6833a8 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/go-example/main.go +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/creating-pipeline/go-example/main.go @@ -27,27 +27,29 @@ package main import ( - "context" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" -) - + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" +) func main() { - p, s := beam.NewPipelineWithRoot() + ctx := context.Background() + beam.Init() - hello := helloBeam(s) - debug.Print(s, hello) + p, s := beam.NewPipelineWithRoot() - err := beamx.Run(context.Background(), p) - if err != nil { - log.Exitf(context.Background(), "Failed to execute job: %v", err) - } + hello := helloBeam(s) + debug.Print(s, hello) + + err := beamx.Run(ctx, p) + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } } func helloBeam(s beam.Scope) beam.PCollection { - return beam.Create(s, "Hello Beam") -} \ No newline at end of file + return beam.Create(s, "Hello Beam") +} diff --git a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/go-example/main.go b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/go-example/main.go index bb3dba69e273..cb47344efdeb 100644 --- a/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/go-example/main.go +++ b/learning/tour-of-beam/learning-content/introduction/introduction-concepts/pipeline-concepts/setting-pipeline/go-example/main.go @@ -28,46 +28,50 @@ package main import ( - "context" - "flag" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" - "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "context" + "flag" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) var ( - // By default, this example reads from a public dataset containing the text of - // King Lear. Set this option to choose a different input file or glob. - input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.") + // By default, this example reads from a public dataset containing the text of + // King Lear. Set this option to choose a different input file or glob. + input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.") - // Set this required option to specify where to write the output. - output = flag.String("output", "", "Output file (required).") + // Set this required option to specify where to write the output. + output = flag.String("output", "", "Output file (required).") ) func main() { - // If beamx or Go flags are used, flags must be parsed first. - flag.Parse() + // If beamx or Go flags are used, flags must be parsed first. + flag.Parse() + + // We can then init Beam + beam.Init() - ctx := context.Background() + ctx := context.Background() - // Input validation is done as usual. Note that it must be after Init(). - if *output == "" { - log.Exitf(ctx,"No output provided") - } + // Input validation is done as usual. Note that it must be after Init(). + if *output == "" { + log.Exitf(ctx, "No output provided") + } - p := beam.NewPipeline() - s := p.Root() + p := beam.NewPipeline() + s := p.Root() - // Read from option input file - lines := textio.Read(s, *input) + // Read from option input file + lines := textio.Read(s, *input) - // Write to option output file - textio.Write(s, *output, lines) + // Write to option output file + textio.Write(s, *output, lines) - err := beamx.Run(ctx, p) + err := beamx.Run(ctx, p) - if err != nil { - log.Exitf(ctx, "Failed to execute job: %v", err) - } + if err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } }