From d18176da893f1755789ce9d5cd8ac374bd22d1a3 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Mon, 23 Dec 2024 11:41:36 -0800 Subject: [PATCH] Handle worker_id is empty string error --- sdks/go/pkg/beam/runners/prism/internal/worker/pool.go | 3 +++ sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go | 6 ++++++ sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 ++ 3 files changed, 11 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/pool.go b/sdks/go/pkg/beam/runners/prism/internal/worker/pool.go index c9918e80bfec..1491ebc147d0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/pool.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/pool.go @@ -43,6 +43,9 @@ func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) { if err != nil { return nil, err } + if id == "" { + return nil, fmt.Errorf("worker id in ctx metadata is an empty string") + } mu.Lock() defer mu.Unlock() if w, ok := m[id]; ok { diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go index 3b1a6aead548..0b8531058169 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go @@ -467,6 +467,12 @@ func TestMapW_workerFromMetadataCtx(t *testing.T) { m: make(MapW), wantErr: "failed to read metadata from context", }, + { + name: "ctx metadata worker_id=''", + ctx: metadata.NewIncomingContext(context.Background(), metadata.Pairs("worker_id", "")), + m: make(MapW), + wantErr: "worker id in ctx metadata is an empty string", + }, { name: "mismatched ctx metadata", ctx: metadata.NewIncomingContext(context.Background(), metadata.Pairs("worker_id", "wk1")), diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b091220a06b5..cfa72f4790cd 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -180,6 +180,8 @@ def __init__( # type: (...) -> None self._alive = True self._worker_index = 0 + if worker_id == '': + raise RuntimeError("worker_id is an empty string") self._worker_id = worker_id self._state_cache = StateCache(state_cache_size) self._deferred_exception = deferred_exception