Skip to content

Commit

Permalink
Handle worker_id is empty string error
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas committed Dec 23, 2024
1 parent 740fa3d commit d18176d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 0 deletions.
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (m MapW) workerFromMetadataCtx(ctx context.Context) (*W, error) {
if err != nil {
return nil, err
}
if id == "" {
return nil, fmt.Errorf("worker id in ctx metadata is an empty string")
}
mu.Lock()
defer mu.Unlock()
if w, ok := m[id]; ok {
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,12 @@ func TestMapW_workerFromMetadataCtx(t *testing.T) {
m: make(MapW),
wantErr: "failed to read metadata from context",
},
{
name: "ctx metadata worker_id=''",
ctx: metadata.NewIncomingContext(context.Background(), metadata.Pairs("worker_id", "")),
m: make(MapW),
wantErr: "worker id in ctx metadata is an empty string",
},
{
name: "mismatched ctx metadata",
ctx: metadata.NewIncomingContext(context.Background(), metadata.Pairs("worker_id", "wk1")),
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def __init__(
# type: (...) -> None
self._alive = True
self._worker_index = 0
if worker_id == '':
raise RuntimeError("worker_id is an empty string")
self._worker_id = worker_id
self._state_cache = StateCache(state_cache_size)
self._deferred_exception = deferred_exception
Expand Down

0 comments on commit d18176d

Please sign in to comment.