From aae628ba619f4e7ed7eb4f8140181e69bc6b542e Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Sun, 19 Feb 2023 05:45:25 -0800 Subject: [PATCH] [prism] worker PR comments --- .../pkg/beam/runners/prism/internal/worker/worker.go | 9 +++------ .../runners/prism/internal/worker/worker_test.go | 12 ++++++------ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index a994ca732fa9..8458ce39e116 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -21,12 +21,11 @@ import ( "bytes" "context" "fmt" + "io" "net" "sync" "sync/atomic" - "io" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" @@ -58,8 +57,6 @@ type W struct { // These are the ID sources inst, bund uint64 - // descs map[string]*fnpb.ProcessBundleDescriptor - InstReqs chan *fnpb.InstructionRequest DataReqs chan *fnpb.Elements @@ -168,7 +165,7 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level { switch sev { case fnpb.LogEntry_Severity_TRACE: - return slog.Level(-8) // + return slog.Level(-8) case fnpb.LogEntry_Severity_DEBUG: return slog.LevelDebug // -4 case fnpb.LogEntry_Severity_INFO: @@ -418,7 +415,7 @@ func (d *DataService) Commit(tent engine.TentativeData) { } } -// Hack for Side Inputs until watermarks are sorted out. +// GetAllData is a hack for Side Inputs until watermarks are sorted out. func (d *DataService) GetAllData(colID string) [][]byte { return d.raw[colID] } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go index faac58a93e75..29b3fab92d64 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" ) @@ -49,7 +50,7 @@ func TestWorker_NextInst(t *testing.T) { } } -func TestWorker_NextBund(t *testing.T) { +func TestWorker_NextStage(t *testing.T) { w := New("test") stageIDs := map[string]struct{}{} @@ -57,7 +58,7 @@ func TestWorker_NextBund(t *testing.T) { stageIDs[w.NextStage()] = struct{}{} } if got, want := len(stageIDs), 100; got != want { - t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want) + t.Errorf("calling w.NextStage() got %v unique ids, want %v", got, want) } } @@ -83,7 +84,7 @@ func TestWorker_GetProcessBundleDescriptor(t *testing.T) { ProcessBundleDescriptorId: "unknown", }) if err == nil { - t.Errorf(" GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd) + t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd) } } @@ -100,7 +101,7 @@ func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) { clientConn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { return lis.DialContext(ctx) - }), grpc.WithInsecure(), grpc.WithBlock()) + }), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) if err != nil { t.Fatal("couldn't create bufconn grpc connection:", err) } @@ -124,7 +125,7 @@ func TestWorker_Logging(t *testing.T) { }) // TODO: Connect to the job management service. - // At this point job messages are just logged to whereever the prism runner executes + // At this point job messages are just logged to wherever the prism runner executes // But this should pivot to anyone connecting to the Job Management service for the // job. // In the meantime, sleep to validate execution via coverage. @@ -273,7 +274,6 @@ func TestWorker_State_Iterable(t *testing.T) { if got, want := resp.GetGet().GetData(), []byte{42}; !bytes.Equal(got, want) { t.Fatalf("didn't receive expected state response data: got %v, want %v", got, want) } - resp.GetId() if err := stateStream.CloseSend(); err != nil { t.Errorf("stateStream.CloseSend() = %v", err)