Microservices Communication Using Azure Service Bus

18 Jan 2021
|
12 min read
Behind The Code

Imagine you are building a bike store application using a microservices architecture. You have decided that you are going to use a database-per-microservice approach, and you would like to inform one or more microservices that something happened with particular details.

In this article, we’ll cover an example of microservices with Azure.

Architecture

Made by Kambu

This is the backend architecture for our application. As mentioned before, we are using a database per microservice approach, which means that every microservice has its own database, and it’s managed only by this microservice. One microservice cannot access another microservice’s database.

Quick introduction of microservices responsibility:

1. Inventory Microservice

This service is responsible for managing the inventory of the shops. It includes all information about what products are defined, what is the current availability of each product, and how much each product does cost.

2. Orders Microservice

It is responsible for holding information about orders – delivery details, amount of products ordered, when it was placed, and delivered, and the current status of the order.

3. Payments Microservice

The responsibility of this service is really simple. It has all the details of payments, the configuration of payments integration, the current status of payments, and other details.

Our goal

Our shop has a very simple policy of managing inventory – when payment for an order has successfully come through, we would like to update our inventory – update the quantity of each product, so when the availability of a product is equal to zero, other customers won’t be able to order specific items.

Want specialists to implement the solution? – talk with us to see how we can help you.

Want specialists to implement your solution?

Let’s talk and discuss how we can help you.

Contact us

The Problem

How to let the order microservice know that it should mark orders as paid, and the inventory microservice know it should update the availability of products?

Well, if we were using a monolithic application approach, we would simply access the database, and do it straight after marking the payment as succeeded, but we can’t do it in our microservice architecture. The payment microservice cannot access the orders database, moreover, it does not know what structure it has, and how it should update data.

Maybe we should send a request directly to the other services, and simply process the request and take action?

This is a bad approach, as it involves a lot of problems going on during communication.

One of the microservice can be down, and we would only partly inform other services about our successful payment. The server can be overloaded with requests, and we would have to wait a lot of time to accomplish our task.

This is the moment when Azure Service Bus comes into action. 

What is Azure Service Bus?

According to Microsoft’s documentation, Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics using Advanced Messaging Queuing Protocol (AMQP for short). It provides such benefits as:

  • Load-balancing work across competing workers
  • Safely routing and transferring data and control across service and application boundaries
  • Coordinating transactional work that requires high-degree of availability

How Can We Solve The Problem Using Azure Service Bus?

Well, after our payment comes through, we have to inform other services – we can do that by using Azure Service Bus topics.

We will try to combine Service Bus topics, subscriptions, and messages to make other services respond to events.

When an event occurs, we fill a special model of the event in service when the event takes place and we will send the event through Service Bus subscription to the Service Bus mainTopic.

Then, the topic checks which subscriptions are subscribing to an event, and makes instances of this message for each subscription, then other services which are connected to Service Bus Topic through subscription, will receive the event and try to process it in their own way.

This process is known as publisher-subscriber pattern and is fully described in Microsoft’s documentation – you can check it here.

Made by Kambu

Application Architecture including Azure Service Bus

In the Azure Portal, we simply have to create:

  1. Resource Group
  2. Service Bus Namespace under resource group
  3. Three subscriptions in the created namespace: Payment, Inventory, Order

Code Implementation

We have to provide an implementation on how the events should be created and dispatched, and how to subscribe to those events from various services and handle the messages.

Let’s create a project, which will be shared through other applications.

We will call it the Messaging project, and it is using a Class Library template, targeting .netstandard2.1.

Models/Event.cs

Let’s create our base class called ‘Event’ under the Models folder – this will be our base model placed inside Azure Service Bus Message.

public abstract class Event
{
   public Guid Id { get; private set; }
   public DateTime CreationDate { get; private set; }

   public Event()
   {
       Id = Guid.NewGuid();
       CreationDate = DateTime.UtcNow;
   }

   public Event(Guid id, DateTime creationDate)
   {
       Id = id;
       CreationDate = creationDate;
   }
}
Models/Subscription.cs

To be able to send messages, we have to deal with subscriptions, let’s create a Subscription class that will hold details about subscriptions.

public class Subscription
{
   public Type HandlerType { get; private set; }

   public Subscription(Type handlerType)
   {
       HandlerType = handlerType;
   }
}

Now we have to handle subscriptions and register handlers.

Meta/IEventHandler.cs

Let’s create interfaces first, so we can prepare our Messaging project for dependency injection.

public interface IEventHandler<in TEvent> : IEventHandler where TEvent : Event
{
   Task Handle(TEvent @event);
}

public interface IEventHandler
{
}
Meta/IEventBusSubscriptionManager.cs
public interface IEventBusSubscriptionManager
{
   bool IsEmpty { get; }
   event EventHandler<string> OnEventRemoved;
  
   void AddSubscription<TEvent, TEventHandler>()
       where TEvent : Event
       where TEventHandler : IEventHandler<TEvent>;

   void RemoveSubscription<TEvent, TEventHandler>()
       where TEvent : Event
       where TEventHandler : IEventHandler<TEvent>;

   bool HasSubscriptionsForEvent<TEvent>() where TEvent : Event;
   bool HasSubscriptionsForEvent(string eventName);
   Type GetEventType(string eventName);
   void Clear();
   IEnumerable<Subscription> GetHandlersForEvent<TEvent>() where TEvent : Event;
   IEnumerable<Subscription> GetHandlersForEvent(string eventName);
   string GetEventKey<TEvent>() where TEvent : Event;
}
Meta/IEventBus.cs
ublic interface IEventBus : IDisposable
{
   void Publish(Event @event);
   Task PublishAsync<TEvent>(TEvent @event) where TEvent : Event;

   void Subscribe<TEvent, TEventHandler>()
       where TEvent : Event
       where TEventHandler : IEventHandler<TEvent>;
  
   void Unsubscribe<TEvent, TEventHandler>()
       where TEvent : Event
       where TEventHandler : IEventHandler<TEvent>;
}
Meta/IEventDispatcher.cs
public interface IEventDispatcher<TEvent> where TEvent : Event
{
   void Publish(TEvent @event);
   Task PublishAsync(TEvent @event);
   Task PublishManyAsync(ICollection<TEvent> @events);
}

In our microservices we want to determine on each service, to which events it is subscribing, and to register handlers and dispatchers.

Handlers will be responsible for handling the event which has come from the topic, and dispatchers will be responsible for dispatching events to the topic.


Implementation

As we have prepared for dependency injection, let’s go to implementation.

InMemoryEventBusSubscriptionManager.cs

For simplicity and comfort, we will manage subscriptions and use memory as a store for all operations.

This implementation of IEventBusSubscriptionManger does not know anything about Service Bus yet, although it is prepared for subscriptions and handlers.

public class InMemoryEventBusSubscriptionManager : IEventBusSubscriptionManager
{
   private readonly Dictionary<string, ICollection<Subscription>> _handlers;
   private readonly ICollection<Type> _eventTypes;

   public event EventHandler<string> OnEventRemoved;

   public InMemoryEventBusSubscriptionManager()
   {
       _handlers = new Dictionary<string, ICollection<Subscription>>();
       _eventTypes = new List<Type>();
   }

   public bool IsEmpty => !_handlers.Keys.Any();
   public void Clear() => _handlers.Clear();

   public void AddSubscription<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler<TEvent>
   {
       var key = GetEventKey<TEvent>();
       DoAddSubscription(typeof(TEventHandler), key);

       if (!_eventTypes.Contains(typeof(TEvent)))
           _eventTypes.Add(typeof(TEvent));
   }
  
   private void DoAddSubscription(Type handlerType, string eventName)
   {
       if(!HasSubscriptionsForEvent(eventName))
           _handlers.Add(eventName, new List<Subscription>());

       if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
           throw new ArgumentException(
               $"Handler of type {handlerType.Name} is already registered for '{eventName}'", nameof(handlerType));

       _handlers[eventName].Add(new Subscription(handlerType));
   }

   public void RemoveSubscription<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler<TEvent>
   {
       var handlerToRemove = FindSubscriptionToRemove<TEvent, TEventHandler>();
       var eventName = GetEventKey<TEvent>();
       DoRemoveHandler(eventName, handlerToRemove);
   }
  
   private void DoRemoveHandler(string eventName, Subscription subscriptionToRemove)
   {
       if (subscriptionToRemove != null)
       {
           _handlers[eventName].Remove(subscriptionToRemove);
           if (!_handlers[eventName].Any())
           {
               _handlers.Remove(eventName);
               var eventType = _eventTypes.SingleOrDefault(e =>
                   string.Equals(e.Name, eventName, StringComparison.InvariantCultureIgnoreCase));
               if (eventType != null)
                   _eventTypes.Remove(eventType);
           }
       }
   }

   public IEnumerable<Subscription> GetHandlersForEvent<TEvent>() where TEvent : Event
   {
       var key = GetEventKey<TEvent>();
       return GetHandlersForEvent(key);
   }

   public IEnumerable<Subscription> GetHandlersForEvent(string eventName) => _handlers[eventName];

   public bool HasSubscriptionsForEvent<TEvent>() where TEvent : Event
   {
       var key = GetEventKey<TEvent>();
       return HasSubscriptionsForEvent(key);
   }

   public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);

   public string GetEventKey<TEvent>() where TEvent : Event
   {
       return typeof(TEvent).Name;
   }

   public Type GetEventType(string eventName) => _eventTypes.SingleOrDefault(t =>
       string.Equals(t.Name, eventName, StringComparison.InvariantCultureIgnoreCase));
  
   private void RaiseOnEventRemoved(string eventName)
   {
       var handler = OnEventRemoved;
       handler?.Invoke(this, eventName);
   }
   private Subscription FindSubscriptionToRemove<TEvent, TEventHandler>()
       where TEvent : Event
       where TEventHandler : IEventHandler<TEvent>
   {
       var key = GetEventKey<TEvent>();
       return DoFindSubscriptionToRemove(key, typeof(TEventHandler));
   }

   private Subscription FindSubscriptionToRemove(string eventName, Type handlerType) =>
       DoFindSubscriptionToRemove(eventName, handlerType);

   private Subscription DoFindSubscriptionToRemove(string eventName, Type handlerType) =>
       HasSubscriptionsForEvent(eventName)
           ? _handlers[eventName].Single(s => s.HandlerType == handlerType)
           : null;
}
AzureServiceBus/Meta/IServiceBusConnectionManager.cs

Let’s bring the Service Bus implementation to our Messaging project.

This interface will be responsible for managing the connection between the application and Service Bus. As said before, we are going to use Topics, so we have to create a separate method to create a topic and be able to use it in other places.

This class needs a reference to Microsoft.Azure.ServiceBus NuGet package (in our case, we are using version 5.1.0).

public interface IServiceBusConnectionManager : IDisposable
{
   ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; }
   ITopicClient CreateTopicClient();
}
AzureServiceBus/ServiceBusConnectionManager.cs

This is an implementation of IServiceBusConnectionManager. It simply creates a topic client, and whether it is in the ‘IsClosedOrClosing’ state, it recreates the connection using ServiceBusConnectionStringBuilder to keep the connection.

We need this kind of behavior because we want our application to always look for messages and whether they are waiting on the topic, so as soon as other applications send messages to the topic, we can handle them immediately.

Additionally, we are using the Microsoft.Extensions.Logging (5.0.0) NuGet package to be able to log required details.

public class ServiceBusConnectionManager : IServiceBusConnectionManager
{
   private readonly ILogger<ServiceBusConnectionManager> _logger;
   private readonly ServiceBusConnectionStringBuilder _connectionStringBuilder;
   private ITopicClient _topicClient;

   private bool _isDisposed;

   public ServiceBusConnectionManager(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder,
       ILogger<ServiceBusConnectionManager> logger)
   {
       _logger = logger ?? throw new ArgumentNullException(nameof(logger));
       _connectionStringBuilder = serviceBusConnectionStringBuilder ??
                                  throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder));
       _topicClient = new TopicClient(_connectionStringBuilder, RetryPolicy.Default);
   }

   public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _connectionStringBuilder;

   public ITopicClient CreateTopicClient()
   {
       if(_topicClient.IsClosedOrClosing)
           _topicClient = new TopicClient(_connectionStringBuilder, RetryPolicy.Default);

       return _topicClient;
   }

   public void Dispose()
   {
       if (_isDisposed)
           return;
       _isDisposed = true;
   }
}
AzureServiceBus/ServiceBusEventBus.cs

This is our implementation of the IEventBus interface using the Azure Service Bus.

Handling messages is done in the ProcessEvent method. We are checking whether we have a handler for messages in our subscription and, if so, we unpack the message body to the string and then we try to deserialize it to the shared Event object and pass it to the Handle method of EventHandler class.

To mark those messages as events, and to make the topic client send it to proper subscriptions when we register a handler for subscription, we add a new rule with CorrelationFilter inside it to our subscription. CorrelationFilter simply makes TopicClient know whether a message with a particular label is there, then the subscription should get it. 

public class ServiceBusEventBus : IEventBus
{
   private readonly IServiceBusConnectionManager _serviceBusConnectionManager;
   private readonly IEventBusSubscriptionManager _eventBusSubscriptionManager;
   private readonly IServiceProvider _serviceProvider;
   private readonly SubscriptionClient _subscriptionClient;
   private readonly ILogger<ServiceBusEventBus> _logger;
   public ServiceBusEventBus(IServiceBusConnectionManager serviceBusConnectionManager,
       IEventBusSubscriptionManager eventBusSubscriptionManager,
       string subscriptionClientName,
       IServiceProvider serviceProvider,
       ILogger<ServiceBusEventBus> logger,
       int maxConcurrentCalls = 10)
   {
       if(string.IsNullOrEmpty(subscriptionClientName))
           throw new ArgumentNullException(nameof(subscriptionClientName));
       _serviceBusConnectionManager = serviceBusConnectionManager ?? throw new ArgumentNullException(nameof(serviceBusConnectionManager));
       _eventBusSubscriptionManager = eventBusSubscriptionManager ?? throw new ArgumentNullException(nameof(eventBusSubscriptionManager));
       _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
       _logger = logger ?? throw new ArgumentNullException(nameof(logger));
      
       _subscriptionClient = new SubscriptionClient(_serviceBusConnectionManager.ServiceBusConnectionStringBuilder, subscriptionClientName);
      
       RegisterSubscriptionClientMessageHandler(maxConcurrentCalls);
       RemoveDefaultRule();
   }

   public void Subscribe<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler<TEvent>
   {
       var eventName = typeof(TEvent).Name.Replace(nameof(Event), string.Empty);

       var alreadySubscribed = _eventBusSubscriptionManager.HasSubscriptionsForEvent<TEvent>();
       if (!alreadySubscribed)
       {
           try
           {
               _subscriptionClient.AddRuleAsync(new RuleDescription
               {
                   Filter = new CorrelationFilter(eventName),
                   Name = eventName,
               }).GetAwaiter().GetResult();
           }
           catch (ServiceBusException)
           {
               _logger.LogWarning($"Subscription to event '{eventName}' already exists'");
           }
       }
       _logger.LogInformation($"Subscribing to event '{eventName}' with '{typeof(TEventHandler).Name}'");
       _eventBusSubscriptionManager.AddSubscription<TEvent, TEventHandler>();
   }

   public void Unsubscribe<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler<TEvent>
   {
       var eventName = typeof(TEvent).Name.Replace(nameof(Event), string.Empty);
       try
       {

       }
       catch (MessagingEntityNotFoundException)
       {
           _logger.LogWarning($"Subscription to event '{eventName} could not be found");
       }
      
       _logger.LogInformation($"Unsubscribing handler '{typeof(TEventHandler).Name}' 'from event {eventName}'");
       _eventBusSubscriptionManager.RemoveSubscription<TEvent, TEventHandler>();
   }
  
   private void RegisterSubscriptionClientMessageHandler(int maxConcurrentCalls)
   {
       _subscriptionClient.RegisterMessageHandler(
           async (message, token) =>
           {
               var eventName = $"{message.Label}{nameof(Event)}";
               var messageContent = Encoding.UTF8.GetString(message.Body);

               if (await ProcessEvent(eventName, messageContent))
                   await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
           }, new MessageHandlerOptions(HandlingEventExceptionHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = false });
   }

   private Task HandlingEventExceptionHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
   {
       var exception = exceptionReceivedEventArgs.Exception;
       var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

       _logger.LogError(exception,
           $"Error occured during handling message: {exception.Message}. Context: {context}");

       return Task.CompletedTask;
   }
   private async Task<bool> ProcessEvent(string eventName, string message)
   {
       var processed = false;
       if (_eventBusSubscriptionManager.HasSubscriptionsForEvent(eventName))
       {
           var subscriptions = _eventBusSubscriptionManager.GetHandlersForEvent(eventName);
           foreach (var subscription in subscriptions)
           {
               var handler = _serviceProvider.GetRequiredService(subscription.HandlerType);
               if (handler == null) continue;
               var eventType = _eventBusSubscriptionManager.GetEventType(eventName);
               var @event = JsonConvert.DeserializeObject(message, eventType);
               var handlerType = typeof(IEventHandler<>).MakeGenericType(eventType);
               await (Task) handlerType.GetMethod("Handle")
                   .Invoke(handler, new object[] {@event});
               processed = true;
           }
       }

       return processed;
   }

   private void RemoveDefaultRule()
   {
       try
       {
           _subscriptionClient
               .RemoveRuleAsync(RuleDescription.DefaultRuleName)
               .GetAwaiter()
               .GetResult();
          
       }
       catch (MessagingEntityNotFoundException)
       {
           _logger.LogWarning($"Messaging entity '{RuleDescription.DefaultRuleName} could not be found'");
       }
   }

   public void Dispose()
   {
       _eventBusSubscriptionManager.Clear();
   }
}

Subscriptions Done, Time For Dispatching Implementation

Great! We are set up with our subscriptions and the way of handling messages, but we cannot forget about dispatching them, let’s create a dispatching implementation.

Settings/ServiceBusSettings.cs

First, we have to create a settings class, to let dispatchers know that it has to use some values to be able to connect to the topic. Then, when the connection is made, we can dispatch messages.

public class ServiceBusSettings
{
   public string ConnectionString { get; set; }
   public string TopicName { get; set; }
}
AzureServiceBus/ServiceBusEventDispatcher.cs

Our main operation of dispatching messages happens in Publish methods. Publish methods are responsible for sending a message to the topic.

We are using Newtonsoft.Json to convert our objects to JSON format and then we convert it to bytes and pass the data to the topic as Service Bus Message.

public abstract class ServiceBusEventDispatcher<TEvent>
   : IDisposable, IAsyncDisposable, IEventDispatcher<TEvent> where TEvent : Event
{
   private readonly string _serviceBusConnectionString;
   private readonly string _topicName;
  
   protected readonly TopicClient TopicClient;
  
   protected ServiceBusEventDispatcher(string serviceBusConnectionString, string topicName)
   {
       _serviceBusConnectionString = serviceBusConnectionString;
       _topicName = topicName;
       TopicClient = new TopicClient(_serviceBusConnectionString, _topicName);
   }

   protected ServiceBusEventDispatcher(TopicClient topicClient)
   {
       TopicClient = topicClient;
   }

   protected ServiceBusEventDispatcher(IOptions<ServiceBusSettings> serviceBusOptions)
   {
       var settings = serviceBusOptions.Value ?? throw new ArgumentNullException(nameof(serviceBusOptions));
       _serviceBusConnectionString = settings.ConnectionString;
       _topicName = settings.TopicName;
       TopicClient = new TopicClient(_serviceBusConnectionString, _topicName);
   }
   public void Publish(TEvent @event)
   {
       var message = PrepareMessage(@event);
       var topicClient = TopicClient;
       topicClient.SendAsync(message).GetAwaiter().GetResult();
   }

   public async Task PublishAsync(TEvent @event)
   {
       var message = PrepareMessage(@event);
       var topicClient = TopicClient;
       await TopicClient.SendAsync(message);
   }

   public async Task PublishManyAsync(ICollection<TEvent> @events)
   {
       var messages = @events.Select(PrepareMessage).ToList();
       await TopicClient.SendAsync(messages);
   }

   private Message PrepareMessage(Event @event)
   {
       var eventName = @event.GetType().Name.Replace(nameof(Event), string.Empty);
       var serializedMessage = JsonConvert.SerializeObject(@event);
       var messageBody = Encoding.UTF8.GetBytes(serializedMessage);

       return new Message
       {
           MessageId = Guid.NewGuid().ToString(),
           Body = messageBody,
           Label = eventName,
           ContentType = System.Net.Mime.MediaTypeNames.Application.Json
       };
   }

   public void Dispose()
   {
       TopicClient?.CloseAsync().GetAwaiter().GetResult();
   }

   public async ValueTask DisposeAsync()
   {
       if (TopicClient != null)
           await TopicClient.CloseAsync();
   }
}

Project Ready! Time For Referencing

Great! This project is ready, right now we can reference it in our microservices!

Our Microservices are ASP.NET Core Web API applications that target .NET Core 3.1.

First, we have to add a reference to our newly created project.

After doing that, let’s create some simple implementation of the event dispatcher.

We start by creating our events classes.

Shared/Notifications/Models/PaymentCompletedEvent.cs

PaymentCompletedEvent – This event will be sent during the handling response from the Payment Provider.

public class PaymentCompletedEvent : Event
{
   public long OrderId { get; set; }
   public long PaymentId { get; set; }
}
Shared/Notifications/Models/OrderIsInProgressEvent.cs
public class OrderIsInProgressEvent : Event
{
   public long OrderId { get; set; }
   public ICollection<OrderIsInProgressEventLine> Lines { get; set; }
}

public class OrderIsInProgressEventLine
{
   public long Product_Id { get; set; }
   public int Quantity { get; set; }
}
PaymentService/Infrastructure/Notifications/Publishers/
Dispatchers/PaymentCompletedEventDispatcher.cs

As we have our models set up, it’s time to dispatch the event to Service Bus. We have to create an event dispatcher first.

This class is using our previously prepared ServiceBusEventDispatcher to dispatch notifications.

In the DispatchAsync method, we create an event object and pass it to the method responsible for sending notifications to the topic.

We also have a constructor for dependency injection to pass details about the Service Bus Topic Client connection.


public class PaymentCompletedEventDispatcher : ServiceBusEventDispatcher<PaymentCompletedEvent>
{
   public PaymentCompletedEventDispatcher(IOptions<ServiceBusSettings> serviceBusOptions) : base(serviceBusOptions)
   {
   }

   public Task DispatchAsync(Payment payment) => PublishAsync(new PaymentCompletedEvent
   {
       PaymentId = payment.Id,
       OrderId = payment.OrderId
   });
}

As we have our dispatcher ready, we can use it in our controller. When the Payment Provider makes a request to the configured callback, we have to handle post payment details.

PaymentService/Controllers/PaymentController.cs

This logic is simple – when the Payment Provider tells us that the payment has been done successfully, we can mark the payment as completed. Otherwise, it’s marked as rejected.

If the payment has been marked as completed, we use our previously created dispatcher to send an event.

[HttpPost("callback")]
public async Task<IActionResult> PaymentProviderCallback([FromBody] PaymentProviderCallbackRequestDto request)
{
   var payment = await _context.Payments.SingleOrDefaultAsync(p => p.Id == request.PaymentId);
   if (payment == null)
       return BadRequest("Invalid PaymentId");

   payment.Status = request.Success ? PaymentStatusEnum.COMPLETED : PaymentStatusEnum.REJECTED;
   await _context.SaveChangesAsync();
   if (payment.Status == PaymentStatusEnum.COMPLETED)
       await _paymentCompletedEventDispatcher.DispatchAsync(payment);

   return Ok();
}

We have to handle the event first – let’s go to Order Service.

OrdersService/Infrastructure/Notifications/Subscribers/
Handlers/PaymentCompletedEventHandler.cs

This is an implementation of our handler. We are implementing the IEventHandler interface from our Messaging project.

We mark orders related to the payment with proper status and dispatch an event to inform other services that the order has been marked as in progress.

public class PaymentCompletedEventHandler : IEventHandler<PaymentCompletedEvent>
{
   private readonly OrderDbContext _context;
   private readonly OrderIsInProgressEventDispatcher _orderIsInProgressEventDispatcher;
   private readonly ILogger<PaymentCompletedEventHandler> _logger;
   private const string LOGGING_PREFIX = "[PaymentCompletedHandler:OrderService]";

   public PaymentCompletedEventHandler(
       OrderIsInProgressEventDispatcher orderIsInProgressEventDispatcher,
       IDatabaseContextProvider<OrderDbContext> contextProvider,
       ILogger<PaymentCompletedEventHandler> logger)
   {
       _context = contextProvider.GetContext() ?? throw new ArgumentNullException(nameof(contextProvider));
       _orderIsInProgressEventDispatcher = orderIsInProgressEventDispatcher ?? throw new ArgumentNullException(nameof(orderIsInProgressEventDispatcher));
       _logger = logger ?? throw new ArgumentNullException(nameof(logger));
   }

   public async Task Handle(PaymentCompletedEvent @event)
   {
       _logger.LogInformation($"{LOGGING_PREFIX} Started handling event with id '{@event.Id}'");
       try
       {
           var order = await _context.Orders.Include(o => o.Lines).SingleAsync(o => o.Id == @event.OrderId);
           if (order == null)
               throw new KeyNotFoundException($"Order with id {@event.OrderId} does not exist");

           order.Status = OrderStatusEnum.IN_PROGRESS;
           await _context.SaveChangesAsync();
           await _orderIsInProgressEventDispatcher.DispatchAsync(order);
       }
       catch (Exception e)
       {
           _logger.LogError(e, $"{LOGGING_PREFIX} Error occured during handling event with id '{@event.Id}'");
           throw;
       }

       _logger.LogInformation($"{LOGGING_PREFIX} Successfully handled event with id '{@event.Id}'");
   }
}
OrdersService/Infrastructure/Notifications/Publishers/
Dispatchers/OrderIsInProgressEventDispatcher.cs
public class OrderIsInProgressEventDispatcher : ServiceBusEventDispatcher<OrderIsInProgressEvent>
{
   public OrderIsInProgressEventDispatcher(IOptions<ServiceBusSettings> serviceBusOptions) : base(serviceBusOptions)
   {
   }

   public Task DispatchAsync(Order order) => PublishAsync(new OrderIsInProgressEvent
   {
       OrderId = order.Id,
       Lines = order.Lines.Select(l => new OrderIsInProgressEventLine
       {
           Product_Id = l.Product_Id,
           Quantity = l.Quantity
       }).ToList()
   });
InventoryService/Infrastructure/Notifications/Subscribers/
Handlers/OrderIsInProgressEventHandler.cs

As other services were informed about order status, it’s time to handle this information.

Our goal was to update our inventory status in our Inventory Service – let’s do that!

Basically, we get products based on ProductId given in the event model, then we simply just decrease the quantity of each product and save our changes to the database.

public class OrderIsInProgressEventHandler : IEventHandler<OrderIsInProgressEvent>
{
   private readonly InventoryDbContext _context;
   private readonly ILogger<OrderIsInProgressEventHandler> _logger;
   private const string LOGGING_PREFIX = "[OrderIsInProgressHandler:InventoryService]";

   public OrderIsInProgressEventHandler(
       IDatabaseContextProvider<InventoryDbContext> contextProvider,
       ILogger<OrderIsInProgressEventHandler> logger)
   {
       _context = contextProvider.GetContext() ?? throw new ArgumentNullException(nameof(contextProvider));
       _logger = logger ?? throw new ArgumentNullException(nameof(logger));
   }

   public async Task Handle(OrderIsInProgressEvent @event)
   {
       _logger.LogInformation($"{LOGGING_PREFIX} Started handling event with id '{@event.Id}'");
       try
       {
           var productsIds = @event.Lines.Select(l => l.Product_Id);
           var products = await _context.Products.Where(p => productsIds.Contains(p.Id)).ToListAsync();
           foreach (var product in products)
           {
               var line = @event.Lines.Single(l => l.Product_Id == product.Id);
               product.Quantity -= line.Quantity;
           }

           await _context.SaveChangesAsync();
       }
       catch (Exception e)
       {
           _logger.LogError(e, $"{LOGGING_PREFIX} Error occured during handling event with id '{@event.Id}'");
           throw;
       }

       _logger.LogInformation($"{LOGGING_PREFIX} Successfully handled event with id '{@event.Id}'");
   }
}

Almost Done…

We have to congratulate ourselves! Finally, we accomplished our goal. It was a really long process, but as soon as we prepared our Messaging project, then adding events, their dispatchers, and handlers everything got really easy.

Okay, we created implementations but what about registering those services? This is important! Let’s register everything.

Time To Register

We need to provide an extension method that will be responsible for registering previously prepared Messaging projects to use in Microservices.

Shared/Notifications/Extensions/
ServiceBusNotificationsExtensions.cs

Let’s create an extension method that will be responsible for using the previously prepared project to communicate with other services using Azure Service Bus.

This code takes the connection string to Service Bus and the main topic name and then registers it for dependency injection.

public static class ServiceBusNotificationsExtensions
{
   public static void AddServiceBusNotifications(this IServiceCollection services, IConfiguration configuration)
   {
       var connectionString = configuration.GetValue<string>("ServiceBus:ConnectionString");
       var mainTopicName = configuration.GetValue<string>("ServiceBus:MainTopicName");
       services.Configure<ServiceBusSettings>(settings =>
       {
           settings.ConnectionString = connectionString;
           settings.TopicName = mainTopicName;
       });
       services.AddSingleton<IServiceBusConnectionManager>(serviceProvider =>
       {
           var logger = serviceProvider.GetRequiredService<ILogger<ServiceBusConnectionManager>>();
           var serviceBusConnectionString = connectionString;
           var connectionStringBuilder = new ServiceBusConnectionStringBuilder(serviceBusConnectionString)
           {
               EntityPath = mainTopicName
           };
           return new ServiceBusConnectionManager(connectionStringBuilder, logger);
       });
       RegisterEventBus(services, configuration);
   }

   private static void RegisterEventBus(IServiceCollection services, IConfiguration configuration)
   {
       var subscriptionClientName = configuration.GetValue<string>("ServiceBus:SubscriptionName");
       services.AddSingleton<IEventBusSubscriptionManager, InMemoryEventBusSubscriptionManager>();
       services.AddSingleton<IEventBus, ServiceBusEventBus>(serviceProvider =>
       {
           var serviceBusConnectionManager = serviceProvider.GetRequiredService<IServiceBusConnectionManager>();
           var logger = serviceProvider.GetRequiredService<ILogger<ServiceBusEventBus>>();
           var subscriptionManager = serviceProvider.GetRequiredService<IEventBusSubscriptionManager>();
           return new ServiceBusEventBus(serviceBusConnectionManager, subscriptionManager, subscriptionClientName,
               serviceProvider, logger);
       });
   }
}

As we have our base prepared, in each project we have to call this extension method and also provide details on which event model should be handled, and what is its handler.

Payment Service

Let’s start with Payment Service.

PaymentService/Startup.cs

In the Startup class, inside the ConfigureServices method we have to call our extension method with one line:

services.AddServiceBusNotifications(Configuration);

Then we have to register the dispatcher. We create a method for registering Service Bus related dependencies:

private void RegisterServiceBusDependencies(IServiceCollection services)
{
   #region Dispatchers

   services.AddScoped<PaymentCompletedEventDispatcher>();

   #endregion
}

Then, after the line with the Service Bus Notifications extension method, we have to add another one:

RegisterServiceBusDependencies(services);

Great – Payment Service is set up! Time for Order Service.

OrderService/Startup.cs

We also use an extension method in the startup, but our RegisterServiceBusDependencies method is different this time.

private void RegisterServiceBusDependencies(IServiceCollection services)
{
   #region Handlers

   services.AddScoped<PaymentCompletedEventHandler>();

   #endregion
   #region Dispatchers

   services.AddScoped<OrderIsInProgressEventDispatcher>();

   #endregion
}

This time we are registering a handler. To configure the handler and let the Service Bus know that it should subscribe to the event and use the proper handler, we have to use a new method:

private void ConfigureServiceBus(IApplicationBuilder applicationBuilder)
{
   var eventBus = applicationBuilder.ApplicationServices.GetRequiredService<IEventBus>();
   eventBus.Subscribe<PaymentCompletedEvent, PaymentCompletedEventHandler>();
}

We just get IEventBus implementation from our Dependency Injection container and let him know about events and event handlers.

Under Configure method we have to call the ConfigureServiceBus method with a simple one-liner:

ConfigureServiceBus(app);
InventoryService/Startup.cs

And, in the end, it’s time for Inventory Service.

private void RegisterServiceBusDependencies(IServiceCollection services)
{
   #region Handlers

   services.AddScoped<OrderIsInProgressEventHandler>();

   #endregion
}

private void ConfigureServiceBus(IApplicationBuilder applicationBuilder)
{
   var eventBus = applicationBuilder.ApplicationServices.GetRequiredService<IEventBus>();
   eventBus.Subscribe<OrderIsInProgressEvent, OrderIsInProgressEventHandler>();
}

Great! That’s all of the implementation.


Conclusion

This looks like it’s really hard to implement and use, but it is not.

The hardest part is to provide a code for a messaging project, and then using it across microservices architecture is quite simple.

This asynchronous messaging is quite impressive as we separate our communication between services logic to multiple event handlers, and we don’t have to worry about timeouts and many more things. We simply dispatch our event and handle it somewhere else.

With this pattern, we don’t have to send requests between services and make some of the actions within a system super long. 

To better understand what is going on, you can simply get a source code of this article at: https://github.com/pmchlk/service-bus-messaging and experience the Service Bus awesomeness on your own!


Bonus – Jetbrains Rider Live Template

As we are using the same handler pattern, but only logic changes, it is good to have a template to speed up the implementation process.

Save this template as ehandler, and every time you create an Event Handler just use it – you will have a logging implementation on top of the try/catch block.


public class $EventName$EventHandler : IEventHandler<$EventName$Event>
	{
    	private readonly $DbContext$ _context;
    	private readonly ILogger<$EventName$IntegrationEventHandler> _logger;
    	private const string LOGGING_PREFIX = "[$EventName$Handler:$MicroserviceName$]";

    	public $EventName$IntegrationEventHandler(
        	IDatabaseContextProvider<$DbContext$> contextProvider,
        	ILogger<$EventName$IntegrationEventHandler> logger)
    	{
        	_context = contextProvider.GetContext() ?? throw new ArgumentNullException(nameof(contextProvider));
        	_logger = logger ?? throw new ArgumentNullException(nameof(logger));
    	}

    	public async Task Handle($EventName$Event @event)
    	{
        	_logger.LogInformation($"{LOGGING_PREFIX} Started handling event with id '{@event.Id}'");
        	try
        	{
        	}
        	catch (Exception e)
        	{
            	_logger.LogError(e, $"{LOGGING_PREFIX} Error occured during handling event with id '{@event.Id}'");
            	throw;
        	}
        	_logger.LogInformation($"{LOGGING_PREFIX} Successfully handled event with id '{@event.Id}'");
    	}
	}
Microservices
Technical
Cloud
Azure
Azure Service Bus
Microservices Communication
Application Development
Tutorial
.Net

Written by