From 49efe31e3ea65313d9e56012a9aa73510f10973c Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 2 Dec 2024 14:44:30 +0000 Subject: [PATCH] [receiver/azureeventhub] Close storage extension client during component shutdown (#36296) #### Description When using a storage extension, the component will call Close on the client during component shutdown. This fixes a bug that resulted in a file potentially remaining locked after component shutdown. #### Link to tracking issue #36238 #### Testing Since reproducing this was described in the issue as "difficult without custom code", testing just involved ensuring that unit tests covered the added code and ran successfully, and manually ensuring that the collector could still receive events from AEH. --- .chloggen/aeh-storageclient-shutdown.yaml | 27 ++++++++++++ .../azureeventhubreceiver/eventhubhandler.go | 43 ++++++++++++------- .../eventhubhandler_test.go | 21 +++++++++ 3 files changed, 76 insertions(+), 15 deletions(-) create mode 100644 .chloggen/aeh-storageclient-shutdown.yaml diff --git a/.chloggen/aeh-storageclient-shutdown.yaml b/.chloggen/aeh-storageclient-shutdown.yaml new file mode 100644 index 000000000000..62ba267a8bd5 --- /dev/null +++ b/.chloggen/aeh-storageclient-shutdown.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/azureeventhub + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: When using a storage extension, the component will call Close on the client during component shutdown. This fixes a bug that resulted in a file potentially remaining locked after component shutdown. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36238] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/azureeventhubreceiver/eventhubhandler.go b/receiver/azureeventhubreceiver/eventhubhandler.go index bf48cb1047f9..94ad5915f233 100644 --- a/receiver/azureeventhubreceiver/eventhubhandler.go +++ b/receiver/azureeventhubreceiver/eventhubhandler.go @@ -5,9 +5,11 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry import ( "context" + "errors" eventhub "github.com/Azure/azure-event-hubs-go/v3" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" @@ -43,24 +45,28 @@ type listerHandleWrapper interface { } type eventhubHandler struct { - hub hubWrapper - dataConsumer dataConsumer - config *Config - settings receiver.Settings - cancel context.CancelFunc + hub hubWrapper + dataConsumer dataConsumer + config *Config + settings receiver.Settings + cancel context.CancelFunc + storageClient storage.Client } func (h *eventhubHandler) run(ctx context.Context, host component.Host) error { ctx, h.cancel = context.WithCancel(ctx) - storageClient, err := adapter.GetStorageClient(ctx, host, h.config.StorageID, h.settings.ID) - if err != nil { - h.settings.Logger.Debug("Error connecting to Storage", zap.Error(err)) - return err + if h.storageClient == nil { // set manually for testing. + storageClient, err := adapter.GetStorageClient(ctx, host, h.config.StorageID, h.settings.ID) + if err != nil { + h.settings.Logger.Debug("Error connecting to Storage", zap.Error(err)) + return err + } + h.storageClient = storageClient } if h.hub == nil { // set manually for testing. - hub, newHubErr := eventhub.NewHubFromConnectionString(h.config.Connection, eventhub.HubWithOffsetPersistence(&storageCheckpointPersister{storageClient: storageClient})) + hub, newHubErr := eventhub.NewHubFromConnectionString(h.config.Connection, eventhub.HubWithOffsetPersistence(&storageCheckpointPersister{storageClient: h.storageClient})) if newHubErr != nil { h.settings.Logger.Debug("Error connecting to Event Hub", zap.Error(newHubErr)) return newHubErr @@ -72,8 +78,7 @@ func (h *eventhubHandler) run(ctx context.Context, host component.Host) error { if h.config.Partition == "" { // listen to each partition of the Event Hub - var runtimeInfo *eventhub.HubRuntimeInformation - runtimeInfo, err = h.hub.GetRuntimeInformation(ctx) + runtimeInfo, err := h.hub.GetRuntimeInformation(ctx) if err != nil { h.settings.Logger.Debug("Error getting Runtime Information", zap.Error(err)) return err @@ -87,7 +92,7 @@ func (h *eventhubHandler) run(ctx context.Context, host component.Host) error { } } } else { - err = h.setUpOnePartition(ctx, h.config.Partition, true) + err := h.setUpOnePartition(ctx, h.config.Partition, true) if err != nil { h.settings.Logger.Debug("Error setting up partition", zap.Error(err)) return err @@ -160,10 +165,18 @@ func (h *eventhubHandler) newMessageHandler(ctx context.Context, event *eventhub } func (h *eventhubHandler) close(ctx context.Context) error { + var errs error + if h.storageClient != nil { + if err := h.storageClient.Close(ctx); err != nil { + errs = errors.Join(errs, err) + } + h.storageClient = nil + } + if h.hub != nil { err := h.hub.Close(ctx) if err != nil { - return err + errs = errors.Join(errs, err) } h.hub = nil } @@ -171,7 +184,7 @@ func (h *eventhubHandler) close(ctx context.Context) error { h.cancel() } - return nil + return errs } func (h *eventhubHandler) setDataConsumer(dataConsumer dataConsumer) { diff --git a/receiver/azureeventhubreceiver/eventhubhandler_test.go b/receiver/azureeventhubreceiver/eventhubhandler_test.go index 15382e127fc2..71ce294ea29b 100644 --- a/receiver/azureeventhubreceiver/eventhubhandler_test.go +++ b/receiver/azureeventhubreceiver/eventhubhandler_test.go @@ -153,3 +153,24 @@ func TestEventhubHandler_newMessageHandler(t *testing.T) { assert.Equal(t, "bar", read.AsString()) assert.NoError(t, ehHandler.close(context.Background())) } + +func TestEventhubHandler_closeWithStorageClient(t *testing.T) { + config := createDefaultConfig() + config.(*Config).Connection = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" + + ehHandler := &eventhubHandler{ + settings: receivertest.NewNopSettings(), + dataConsumer: &mockDataConsumer{}, + config: config.(*Config), + } + ehHandler.hub = &mockHubWrapper{} + mockClient := newMockClient() + ehHandler.storageClient = mockClient + + assert.NoError(t, ehHandler.run(context.Background(), componenttest.NewNopHost())) + require.NotNil(t, ehHandler.storageClient) + require.NotNil(t, mockClient.cache) + assert.NoError(t, ehHandler.close(context.Background())) + require.Nil(t, ehHandler.storageClient) + require.Nil(t, mockClient.cache) +}