Skip to content

Commit

Permalink
Fixed #2 by replacing the OnMessage callback in the ServiceBusSource
Browse files Browse the repository at this point in the history
  • Loading branch information
marcpiechura committed Aug 1, 2016
1 parent 5ea44cf commit f032acd
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
8 changes: 5 additions & 3 deletions src/Akka.Streams.Azure.ServiceBus.Examples/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using Akka.Actor;
using Akka.Streams.Dsl;
using Microsoft.ServiceBus.Messaging;
Expand All @@ -7,8 +8,8 @@ namespace Akka.Streams.Azure.ServiceBus.Examples
{
class Program
{
private const string ConnectionString = "{ServiceBus connection string}";
private const string QueueName = "{ServiceBus queue name}";
private const string ConnectionString = "Endpoint=sb://akkastreamsazureeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=C4QxVntX9qwj2xvEMYm6rmlEZENWviFrUOlM0ayAwd8=";
private const string QueueName = "test";

static void Main()
{
Expand All @@ -19,7 +20,7 @@ static void Main()
using (var mat = sys.Materializer())
{
Console.WriteLine("Writing messages into the queue");
var t = Source.From(new[] {1, 2, 3})
var t = Source.From(Enumerable.Range(1,100))
.Select(x => new BrokeredMessage("Message: " + x))
.Grouped(10)
.ToServiceBus(client, mat);
Expand All @@ -31,6 +32,7 @@ static void Main()
t = ServiceBusSource.Create(client).Select(x =>
{
var message = x.GetBody<string>();
x.Complete();
return message;
}).RunForeach(Console.WriteLine, mat);

Expand Down
12 changes: 6 additions & 6 deletions src/Akka.Streams.Azure.ServiceBus/BusClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Akka.Streams.Azure.ServiceBus
{
internal interface IBusClient
{
void OnMessageAsync(Func<BrokeredMessage, Task> callback, OnMessageOptions options);
Task<IEnumerable<BrokeredMessage>> ReceiveBatchAsync(int messageCount, TimeSpan serverWaitTime);

Task SendBatchAsync(IEnumerable<BrokeredMessage> messages);
}
Expand All @@ -21,8 +21,8 @@ public QueueClientWrapper(QueueClient client)
_client = client;
}

public void OnMessageAsync(Func<BrokeredMessage, Task> callback, OnMessageOptions options)
=> _client.OnMessageAsync(callback, options);
public Task<IEnumerable<BrokeredMessage>> ReceiveBatchAsync(int messageCount, TimeSpan serverWaitTime)
=> _client.ReceiveBatchAsync(messageCount, serverWaitTime);

public Task SendBatchAsync(IEnumerable<BrokeredMessage> messages) => _client.SendBatchAsync(messages);
}
Expand All @@ -36,8 +36,8 @@ public SubscriptionClientWrapper(SubscriptionClient client)
_client = client;
}

public void OnMessageAsync(Func<BrokeredMessage, Task> callback, OnMessageOptions options)
=> _client.OnMessageAsync(callback, options);
public Task<IEnumerable<BrokeredMessage>> ReceiveBatchAsync(int messageCount, TimeSpan serverWaitTime)
=> _client.ReceiveBatchAsync(messageCount, serverWaitTime);

public Task SendBatchAsync(IEnumerable<BrokeredMessage> messages)
{
Expand All @@ -54,7 +54,7 @@ public TopicClientWrapper(TopicClient client)
_client = client;
}

public void OnMessageAsync(Func<BrokeredMessage, Task> callback, OnMessageOptions options)
public Task<IEnumerable<BrokeredMessage>> ReceiveBatchAsync(int messageCount, TimeSpan serverWaitTime)
{
throw new NotImplementedException();
}
Expand Down
107 changes: 64 additions & 43 deletions src/Akka.Streams.Azure.ServiceBus/ServiceBusSource.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Azure.Utils;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Microsoft.ServiceBus.Messaging;

namespace Akka.Streams.Azure.ServiceBus
Expand All @@ -13,75 +17,87 @@ public class ServiceBusSource : GraphStage<SourceShape<BrokeredMessage>>
{
#region Logic

private sealed class Logic : GraphStageLogic
private sealed class Logic : TimerGraphStageLogic
{
private const string TimerKey = "PollTimer";
private readonly ServiceBusSource _source;
private Action<Tuple<BrokeredMessage, TaskCompletionSource<NotUsed>>> _onMessageCallback;
private Tuple<BrokeredMessage, TaskCompletionSource<NotUsed>> _pendingMessage;
private readonly Decider _decider;
private Action<Task<IEnumerable<BrokeredMessage>>> _messagesReceived;

public Logic(ServiceBusSource source) : base(source.Shape)
public Logic(ServiceBusSource source, Attributes attributes) : base(source.Shape)
{
_source = source;
SetHandler(source.Out, TryPush);
}
_decider = attributes.GetDeciderOrDefault();

public override void PreStart()
{
_onMessageCallback = GetAsyncCallback<Tuple<BrokeredMessage, TaskCompletionSource<NotUsed>>>(OnMessage);
_source._client.OnMessageAsync(message =>
{
var completion = new TaskCompletionSource<NotUsed>();
_onMessageCallback(Tuple.Create(message, completion));
return completion.Task;
}, _source._options);
SetHandler(source.Out, PullQueue);
}

private void OnMessage(Tuple<BrokeredMessage, TaskCompletionSource<NotUsed>> t)
{
_pendingMessage = t;
TryPush();
}
public override void PreStart()
=> _messagesReceived = GetAsyncCallback<Task<IEnumerable<BrokeredMessage>>>(OnMessagesReceived);

private void PullQueue() =>
_source._client.ReceiveBatchAsync(_source._maxMessageCount, _source._serverWaitTime)
.ContinueWith(_messagesReceived);

private void TryPush()
private void OnMessagesReceived(Task<IEnumerable<BrokeredMessage>> task)
{
if (_pendingMessage != null && IsAvailable(_source.Out))
if (task.IsFaulted || task.IsCanceled)
{
Push(_source.Out, _pendingMessage.Item1);
_pendingMessage.Item2.TrySetResult(NotUsed.Instance);
_pendingMessage = null;
if (_decider(task.Exception) == Directive.Stop)
FailStage(task.Exception);
else
ScheduleOnce(TimerKey, _source._pollInterval);

return;
}

// Try again if the queue is empty
if (task.Result == null || !task.Result.Any())
ScheduleOnce(TimerKey, _source._pollInterval);
else
EmitMultiple(_source.Out, task.Result);
}

protected override void OnTimer(object timerKey) => PullQueue();
}

#endregion

/// <summary>
/// Creates a <see cref="Source{TOut,TMat}"/> for the Azure ServiceBus
/// </summary>
/// <param name="client">The client</param>
/// <param name="options">The options that are used for the <see cref="QueueClient.OnMessageAsync(Func{BrokeredMessage, Task}, OnMessageOptions)"/> call</param>
public static Source<BrokeredMessage, NotUsed> Create(QueueClient client, OnMessageOptions options = null)
/// <param name="maxMessageCount">The maximum number of messages to receive in a batch</param>
/// <param name="serverWaitTime">The time span that the server will wait for the message batch to arrive before it times out. Default = 3 seconds</param>
/// <param name="pollInterval">The intervall in witch the queue should be polled if it is empty. Default = 10 seconds</param>
public static Source<BrokeredMessage, NotUsed> Create(QueueClient client, int maxMessageCount = 100, TimeSpan? serverWaitTime = null, TimeSpan? pollInterval = null)
{
return Source.FromGraph(new ServiceBusSource(client, options));
return Source.FromGraph(new ServiceBusSource(client, maxMessageCount, serverWaitTime, pollInterval));
}

/// <summary>
/// Creates a <see cref="Source{TOut,TMat}"/> for the Azure ServiceBus
/// </summary>
/// <param name="client">The client</param>
/// <param name="options">The options that are used for the <see cref="SubscriptionClient.OnMessageAsync(Func{BrokeredMessage, Task}, OnMessageOptions)"/> call</param>
public static Source<BrokeredMessage, NotUsed> Create(SubscriptionClient client, OnMessageOptions options = null)
/// <param name="maxMessageCount">The maximum number of messages to receive in a batch</param>
/// <param name="serverWaitTime">The time span that the server will wait for the message batch to arrive before it times out. Default = 3 seconds</param>
/// <param name="pollInterval">The intervall in witch the queue should be polled if it is empty. Default = 10 seconds</param>
public static Source<BrokeredMessage, NotUsed> Create(SubscriptionClient client, int maxMessageCount = 100, TimeSpan? serverWaitTime = null, TimeSpan? pollInterval = null)
{
return Source.FromGraph(new ServiceBusSource(client, options));
return Source.FromGraph(new ServiceBusSource(client, maxMessageCount, serverWaitTime, pollInterval));
}

private readonly IBusClient _client;
private readonly OnMessageOptions _options;
private readonly int _maxMessageCount;
private readonly TimeSpan _serverWaitTime;
private readonly TimeSpan _pollInterval;

private ServiceBusSource(IBusClient client, OnMessageOptions options)
private ServiceBusSource(IBusClient client, int maxMessageCount, TimeSpan? serverWaitTime, TimeSpan? pollInterval)
{
_client = client;
_options = options ?? new OnMessageOptions();
_maxMessageCount = maxMessageCount;
_serverWaitTime = serverWaitTime ?? TimeSpan.FromSeconds(3);
_pollInterval = pollInterval ?? TimeSpan.FromSeconds(10);

Shape = new SourceShape<BrokeredMessage>(Out);
}
Expand All @@ -90,9 +106,11 @@ private ServiceBusSource(IBusClient client, OnMessageOptions options)
/// Create a new instance of the <see cref="ServiceBusSource"/>
/// </summary>
/// <param name="client">The client</param>
/// <param name="options">The options that are used for the <see cref="QueueClient.OnMessageAsync(Func{BrokeredMessage, Task}, OnMessageOptions)"/> call</param>
public ServiceBusSource(QueueClient client, OnMessageOptions options = null)
: this(new QueueClientWrapper(client), options)
/// <param name="maxMessageCount">The maximum number of messages to receive in a batch</param>
/// <param name="serverWaitTime">The time span that the server will wait for the message batch to arrive before it times out. Default = 3 seconds</param>
/// <param name="pollInterval">The intervall in witch the queue should be polled if it is empty. Default = 10 seconds</param>
public ServiceBusSource(QueueClient client, int maxMessageCount = 100, TimeSpan? serverWaitTime = null, TimeSpan? pollInterval = null)
: this(new QueueClientWrapper(client), maxMessageCount, serverWaitTime, pollInterval)
{

}
Expand All @@ -101,17 +119,20 @@ public ServiceBusSource(QueueClient client, OnMessageOptions options = null)
/// Create a new instance of the <see cref="ServiceBusSource"/>
/// </summary>
/// <param name="client">The client</param>
/// <param name="options">The options that are used for the <see cref="SubscriptionClient.OnMessageAsync(Func{BrokeredMessage, Task}, OnMessageOptions)"/> call</param>
public ServiceBusSource(SubscriptionClient client, OnMessageOptions options = null)
: this(new SubscriptionClientWrapper(client), options)
/// <param name="maxMessageCount">The maximum number of messages to receive in a batch</param>
/// <param name="serverWaitTime">The time span that the server will wait for the message batch to arrive before it times out. Default = 3 seconds</param>
/// <param name="pollInterval">The intervall in witch the queue should be polled if it is empty. Default = 10 seconds</param>
public ServiceBusSource(SubscriptionClient client, int maxMessageCount = 100, TimeSpan? serverWaitTime = null, TimeSpan? pollInterval = null)
: this(new SubscriptionClientWrapper(client), maxMessageCount, serverWaitTime, pollInterval)
{

}

public Outlet<BrokeredMessage> Out { get; } = new Outlet<BrokeredMessage>("ServiceBusSource.Out");

public override SourceShape<BrokeredMessage> Shape { get; }

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);

protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
=> new Logic(this, inheritedAttributes);
}
}

2 comments on commit f032acd

@mikhailshilkov
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you shouldn't publish your Azure access keys ;)

@marcpiechura
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I have already seen it and changed them but thanks for the hint :)

Please sign in to comment.