Introduction
With Azure Service Bus, you get some great features but to get guaranteed FIFO (First-In, First-Out, i.e. ordered delivery), there was a lot of documentation to read through so I thought I’d show a quick example using a Topic and Subscriber.
Service Bus Message Sessions
To ensure FIFO, Service Bus requires that a Session is used which is fairly simple. The sender needs to set the SessionId property on the Message that it sends.
Sender:
This method below initialises a TopicClient using a connection string and topic name. It then creates a list of messages to send to the service bus topic and it sets the same SessionId on each one. If you don’t specify the same SessionId, you’ll find messages received in the wrong order.
private static async Task SendMessagesAsync(int number) | |
{ | |
var topicClient = new TopicClient(_senderConnectionString, _topicName); | |
var messages = new List<Message>(); | |
var sessionId = Guid.NewGuid().ToString(); | |
for (var i = 0; i < number; i++) | |
{ | |
var message = new Message(Encoding.UTF8.GetBytes(i.ToString())) | |
{ | |
SessionId = sessionId | |
}; | |
messages.Add(message); | |
} | |
await topicClient.SendAsync(messages); | |
} |
Receiver:
The receiver, initialises a SubscriptionClient then calls the RegisterSessionHandle() method which registers a message handler and takes a SessionHandlerOptions parameter. This SessionHandlerOptions object requires that a exception message handler is specified which is called in case of an exception handling the message.
private static void StartReceivingMessages(string connectionString, string subscriptionName) | |
{ | |
var subscriptionClient = new SubscriptionClient(connectionString, _topicName, subscriptionName); | |
var handlerOptions = new SessionHandlerOptions(ProcessMessageException); | |
subscriptionClient.RegisterSessionHandler(ProcessMessage, handlerOptions); | |
} |
This handler references the ProcessMessageAsync() method which takes an IMessageSession, Message and CancellationToken:
private static async Task ProcessMessageAsync(IMessageSession messageSession, Message message, | |
CancellationToken cancellationToken) | |
{ | |
await Task.Run(() => | |
{ | |
var messageBody = Encoding.UTF8.GetString(message.Body); | |
Console.WriteLine($"Received message: {messageBody}"); | |
_receiverData.Add(int.Parse(messageBody)); | |
}); | |
} |
The ProcessMessageException() method looks like this:
private static async Task ProcessMessageExceptionAsync(ExceptionReceivedEventArgs arg) | |
{ | |
await Task.Run(() => | |
{ | |
throw new Exception("Receiver Exception", arg.Exception); | |
}); | |
} |
That’s all you need to do to get this working. Let’s test it out.
Testing
To test it out, I set up a simple console app which sends the messages to the topic, waits for them to be processed and then checks to make sure all received messages are in the correct order. See below:
using Microsoft.Azure.ServiceBus; | |
using System; | |
using System.Collections.Generic; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ServiceBus | |
{ | |
class Program | |
{ | |
private static readonly string _senderConnectionString = "My connection string"; | |
private static readonly string _receiverConnectionString = "My connection string"; | |
private static readonly int _messageCount = 100; | |
private static readonly string _topicName = "mytopic"; | |
private static readonly string _subscriptionName = "mysubscription"; | |
private static List<int> _receiverData = new List<int>(); | |
static async Task Main(string[] args) | |
{ | |
StartReceivingMessages(_receiverConnectionString, _subscriptionName); | |
await SendMessagesAsync(_messageCount); | |
await WaitForMessagesToBeProcessedAsync(); | |
AnalyzeResults(); | |
} | |
private static void StartReceivingMessages(string connectionString, string subscriptionName) | |
{ | |
var subscriptionClient = new SubscriptionClient(connectionString, _topicName, subscriptionName); | |
var handlerOptions = new SessionHandlerOptions(ProcessMessageExceptionAsync); | |
subscriptionClient.RegisterSessionHandler(ProcessMessageAsync, handlerOptions); | |
} | |
private static async Task ProcessMessageAsync(IMessageSession messageSession, Message message, | |
CancellationToken cancellationToken) | |
{ | |
await Task.Run(() => | |
{ | |
var messageBody = Encoding.UTF8.GetString(message.Body); | |
Console.WriteLine($"Received message: {messageBody}"); | |
_receiverData.Add(int.Parse(messageBody)); | |
}); | |
} | |
private static async Task ProcessMessageExceptionAsync(ExceptionReceivedEventArgs arg) | |
{ | |
await Task.Run(() => | |
{ | |
throw new Exception("Receiver Exception", arg.Exception); | |
}); | |
} | |
private static async Task SendMessagesAsync(int number) | |
{ | |
var topicClient = new TopicClient(_senderConnectionString, _topicName); | |
var messages = new List<Message>(); | |
var sessionId = Guid.NewGuid().ToString(); | |
for (var i = 0; i < number; i++) | |
{ | |
var message = new Message(Encoding.UTF8.GetBytes(i.ToString())) | |
{ | |
SessionId = sessionId | |
}; | |
messages.Add(message); | |
} | |
await topicClient.SendAsync(messages); | |
} | |
private static void AnalyzeResults() | |
{ | |
var messagesOutOfOrder = new List<int>(); | |
for (var i = 0; i < _messageCount; i++) | |
{ | |
if (_receiverData[i] != i) | |
{ | |
messagesOutOfOrder.Add(i); | |
} | |
} | |
Console.WriteLine($"Messages out of order: {messagesOutOfOrder.Count}"); | |
Console.WriteLine($"{string.Join(", ", messagesOutOfOrder)}"); | |
} | |
private static async Task WaitForMessagesToBeProcessedAsync() | |
{ | |
while (_receiverData.Count != _messageCount) | |
{ | |
await Task.Delay(TimeSpan.FromMilliseconds(100)); | |
} | |
} | |
} | |
} |