Skip to content

Commit

Permalink
[Cosmos DB] Add Listener logs for debugging (#767)
Browse files Browse the repository at this point in the history
Adding more listener logs
  • Loading branch information
ealsur authored Apr 13, 2022
1 parent 01b1a82 commit aa0ed5d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/WebJobs.Extensions.CosmosDB/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ internal static class Events
public static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery");
public static readonly EventId OnListenerStopError = new EventId(5, "OnTriggerListenerStopError");
public static readonly EventId OnScaling = new EventId(6, "OnScaling");
public static readonly EventId OnListenerStarted = new EventId(6, "OnListenerStarted");
public static readonly EventId OnListenerStartError = new EventId(7, "OnListenerStartError");
public static readonly EventId OnListenerStopped = new EventId(8, "OnListenerStopped");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ internal class CosmosDBTriggerListener<T> : IListener, IScaleMonitor<CosmosDBTri
private readonly string _functionId;
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly CosmosDBTriggerHealthMonitor _healthMonitor;
private readonly string _listenerLogDetails;
private ChangeFeedProcessor _host;
private ChangeFeedProcessorBuilder _hostBuilder;
private int _listenerStatus;
Expand Down Expand Up @@ -70,6 +71,8 @@ public CosmosDBTriggerListener(
this._cosmosDBAttribute = cosmosDBAttribute;
this._scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower());
this._healthMonitor = new CosmosDBTriggerHealthMonitor(logger);
this._listenerLogDetails = $"prefix='{this._processorName}', monitoredContainer='{this._monitoredContainer.Id}', monitoredDatabase='{this._monitoredContainer.Database.Id}', " +
$"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'";
}

public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor;
Expand Down Expand Up @@ -103,11 +106,13 @@ public async Task StartAsync(CancellationToken cancellationToken)
{
await this.StartProcessorAsync();
Interlocked.CompareExchange(ref this._listenerStatus, ListenerRegistered, ListenerRegistering);
this._logger.LogDebug(Events.OnListenerStarted, "Started the listener for {Details}.", this._listenerLogDetails);
}
catch (Exception ex)
{
// Reset to NotRegistered
this._listenerStatus = ListenerNotRegistered;
this._logger.LogError(Events.OnListenerStartError, "Starting the listener for {Details} failed. Exception: {Exception}.", this._listenerLogDetails, ex);

// Throw a custom error if NotFound.
if (ex is CosmosException docEx && docEx.StatusCode == HttpStatusCode.NotFound)
Expand All @@ -130,11 +135,12 @@ public async Task StopAsync(CancellationToken cancellationToken)
{
await this._host.StopAsync().ConfigureAwait(false);
this._listenerStatus = ListenerNotRegistered;
this._logger.LogDebug(Events.OnListenerStopped, "Stopped the listener for {Details}.", this._listenerLogDetails);
}
}
catch (Exception ex)
{
this._logger.LogWarning(Events.OnListenerStopError, "Stopping the observer failed, potentially it was never started. Exception: {Exception}.", ex);
this._logger.LogError(Events.OnListenerStopError, "Stopping the listener for {Details} failed. Exception: {Exception}.", this._listenerLogDetails, ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class CosmosDBListenerTests
private readonly Mock<FeedIterator<ChangeFeedProcessorState>> _estimatorIterator;
private readonly CosmosDBTriggerListener<dynamic> _listener;
private readonly string _functionId;
private readonly string _logDetails;

public CosmosDBListenerTests()
{
Expand Down Expand Up @@ -66,6 +67,9 @@ public CosmosDBListenerTests()
var attribute = new CosmosDBTriggerAttribute(DatabaseName, ContainerName);

_listener = new CosmosDBTriggerListener<dynamic>(_mockExecutor.Object, _functionId, ProcessorName, _monitoredContainer.Object, _leasesContainer.Object, attribute, _loggerFactory.CreateLogger<CosmosDBTriggerListener<dynamic>>());

_logDetails = $"prefix='{ProcessorName}', monitoredContainer='{ContainerName}', monitoredDatabase='{DatabaseName}', " +
$"leaseContainer='{ContainerName}', leaseDatabase='{DatabaseName}', functionId='{this._functionId}'";
}

[Fact]
Expand Down Expand Up @@ -342,11 +346,11 @@ public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn()
[Fact]
public async Task StartAsync_Retries()
{
var attribute = new CosmosDBTriggerAttribute("test", "test") { LeaseContainerPrefix = Guid.NewGuid().ToString() };
var attribute = new CosmosDBTriggerAttribute("test", "test") { LeaseContainerPrefix = ProcessorName };

var mockExecutor = new Mock<ITriggeredFunctionExecutor>();

var listener = new MockListener<dynamic>(mockExecutor.Object, _monitoredContainer.Object, _leasesContainer.Object, attribute, NullLogger.Instance);
var listener = new MockListener<dynamic>(mockExecutor.Object, _functionId, ProcessorName, _monitoredContainer.Object, _leasesContainer.Object, attribute, _loggerFactory.CreateLogger<CosmosDBTriggerListener<dynamic>>());

// Ensure that we can call StartAsync() multiple times to retry if there is an error.
for (int i = 0; i < 3; i++)
Expand All @@ -357,18 +361,34 @@ public async Task StartAsync_Retries()

// This should succeed
await listener.StartAsync(CancellationToken.None);

var logs = _loggerProvider.GetAllLogMessages().ToArray();
Assert.Equal(LogLevel.Error, logs[0].Level);
Assert.Equal(Events.OnListenerStartError, logs[0].EventId);
Assert.Contains(_logDetails, logs[0].FormattedMessage);
Assert.Equal(LogLevel.Error, logs[1].Level);
Assert.Equal(Events.OnListenerStartError, logs[1].EventId);
Assert.Contains(_logDetails, logs[1].FormattedMessage);
Assert.Equal(LogLevel.Error, logs[2].Level);
Assert.Equal(Events.OnListenerStartError, logs[2].EventId);
Assert.Contains(_logDetails, logs[2].FormattedMessage);
Assert.Equal(LogLevel.Debug, logs[3].Level);
Assert.Equal(Events.OnListenerStarted, logs[3].EventId);
Assert.Contains(_logDetails, logs[3].FormattedMessage);
}

private class MockListener<T> : CosmosDBTriggerListener<T>
{
private int _retries = 0;

public MockListener(ITriggeredFunctionExecutor executor,
string functionId,
string processorName,
Container monitoredContainer,
Container leaseContainer,
CosmosDBTriggerAttribute cosmosDBAttribute,
ILogger logger)
: base(executor, Guid.NewGuid().ToString(), string.Empty, monitoredContainer, leaseContainer, cosmosDBAttribute, logger)
: base(executor, functionId, processorName, monitoredContainer, leaseContainer, cosmosDBAttribute, logger)
{
}

Expand Down

0 comments on commit aa0ed5d

Please sign in to comment.