From 864c0429a0f7390cb6dc0d86710e4bb217399fa4 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 2 Aug 2023 14:32:08 -0700 Subject: [PATCH] quick first pass --- dev-support/docker/Dockerfile | 2 +- .../prism/internal/jobservices/management.go | 2 +- sdks/go/pkg/beam/runners/prism/internal/stage.go | 7 +------ sdks/go/pkg/beam/transforms/stats/quantiles.go | 2 +- sdks/go/test/integration/integration.go | 15 ++++++--------- 5 files changed, 10 insertions(+), 18 deletions(-) diff --git a/dev-support/docker/Dockerfile b/dev-support/docker/Dockerfile index 5b7262ef681f..9422ae5a8863 100644 --- a/dev-support/docker/Dockerfile +++ b/dev-support/docker/Dockerfile @@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest ### # Install Go ### -ENV DOWNLOAD_GO_VERSION=1.20.5 +ENV DOWNLOAD_GO_VERSION=1.20.6 RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \ tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz ENV GOROOT /usr/local/go diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 3dee7a1b787d..b2495a499d90 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -145,7 +145,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } // Inspect Windowing strategies for unsupported features. - for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { + for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0)) check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 6e244905ceda..ec2675ff36f9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -134,11 +134,6 @@ progress: progTick.Stop() break progress // exit progress loop on close. case <-progTick.C: - resp, err := b.Progress(wk) - if err != nil { - slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) - break progress - } resp, err := b.Progress(wk) if err != nil { slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) @@ -410,7 +405,7 @@ func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components for _, prep := range prepSides { prep(b, watermark) } - }, nil + }, nil } // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 7685852efba6..2734bba3dc60 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -47,7 +47,7 @@ func init() { beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement) register.Function1x2(fixedKey) - register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out. + //register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out. } // Opts contains settings used to configure how approximate quantiles are computed. diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 7914a0525fef..1224ce2bb071 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -137,25 +137,24 @@ var portableFilters = []string{ } var prismFilters = []string{ - // The portable runner does not support the TestStream primitive + // The prism runner does not support the TestStream primitive "TestTestStream.*", // The trigger and pane tests uses TestStream "TestTrigger.*", "TestPanes", - // TODO(https://github.com/apache/beam/issues/21058): Python portable runner times out on Kafka reads. + + // TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism. "TestKafkaIO.*", - // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow prism runners. "TestBigQueryIO.*", "TestSpannerIO.*", - // The portable runner does not support self-checkpointing - "TestCheckpointing", - // The portable runner does not support pipeline drain for SDF. + // The prism runner does not support pipeline drain for SDF. "TestDrain", // FhirIO currently only supports Dataflow runner "TestFhirIO.*", // OOMs currently only lead to heap dumps on Dataflow runner "TestOomParDo", - // The portable runner does not support user state. + // The prism runner does not support user state. "TestValueState", "TestValueStateWindowed", "TestValueStateClear", @@ -328,8 +327,6 @@ func CheckFilters(t *testing.T) { filters = prismFilters case "portable", "PortableRunner": filters = portableFilters - case "prism", "PrismRunner": - filters = prismFilters case "flink", "FlinkRunner": filters = flinkFilters case "samza", "SamzaRunner":