Getting started with Azure Service Bus with MassTransit

MassTransit is a free and open-source library that helps you build message-based applications and services in .NET. At the time of writing, it supports RabbitMQ, Azure Service Bus, and AmazonSqs.

MassTransit provides a framework that makes it easy to send and publish messages as well as receiving them. If you've ever had to incorporate a messaging system such as RabbitMQ or Azure Service Bus, you may find yourself, like me, writing wrappers or abstractions around it. With MassTransit, it helps a little that they have provided a framework to do some of the work so that you can focus on sending and consuming messages.

It also offers other features such as monitoring messages as they are received, consumed, sent and published. MassTransit refers to this as Observing messages. It also provides scheduling messages which it uses Quartz underneath.

From my understanding, there are three ways that you can send messages using MassTransit.

  • Send. A message is delivered to a specific endpoint. i.e. a queue and a single consumer subscribes to this queue.
  • Publish. When using publish, the message is broadcast to many consumers.
  • Request and Response. MassTransit also offers a Request and Response mechanism where you can send a request and receive a response based on that request.

MassTransit can be set up with a web or console application without too much trouble. It's all in the configuration and once it's all set up, you can focus on the core part, which is sending, receiving and processing messages.

Getting started

We will be using Azure Service bus to build a simple web app in dotnet core to demonstrate how to send a message (via queues) and publish messages (via topics) from a web application. We will also create a simple console app that will receive messages from the queue and the topic.

The application example is based on a contrived example of flight orders and flight cancellations. Flight orders will be broadcast and can be consumed by multiple consumers. Flight cancellations will be sent to a queue where they will be processed by one consumer.

MassTransit Packages

To start, the following nuget packages are required:

Create a service bus on Azure

Create a new service bus on Azure if you don't have one already. Just a note, to use Azure Service Bus Topics, the "standard" pricing tier is required. If you are just playing around and only need queues, the "Basic" pricing tier is sufficient.

azure-service-bus-create-bus

Once the bus is created, it is not mandatory to create the queues and topics at this stage. You can either let MassTransit create them for you with their naming conventions, or you can specify the names as part of the configuration MassTransit, which is what we are going to do here.

Be sure to make a note of the connection string as highlighted below.

azure-service-bus-connection-string

Create Messages to be sent

In this example, since we will be sending two types of messages, one via a queue and another via a topic, here are the two types of messages we will be sending.

public class FlightOrder
{
    public Guid FlightId { get; set;  }
    public int OrderId { get; set;  }
}

The FlightOrder will be sent to the topic flight-orders.

public class FlightCancellation
{
    public Guid FlightId { get; set; }

    public int CancellationId { get; set; }
}

The FlightCancellation will be sent to the queue flight-cancellation.

Create the publisher

For this example, we will create a publisher within a dotnet core web application. In other words, the web application will send and publish messages.

Configuration

In the Startup.cs, an Azure Service Bus needs to be created in the ConfigureServices method as shown below.

public void ConfigureServices(IServiceCollection services)
{
    string connectionString = "Endpoint=sb://your-service-bus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz";

    string flightOrdersTopic = "flight-orders";

    // create the bus using Azure Service bus
    var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
    {
        // specify the message FlightOrder to be sent to a specific topic
        busFactoryConfig.Message<FlightOrder>(configTopology =>
        {
            configTopology.SetEntityName(flightOrdersTopic);
        });

        var host = busFactoryConfig.Host(connectionString, hostConfig =>
        {
            // This is optional, but you can specify the protocol to use.
            hostConfig.TransportType = TransportType.AmqpWebSockets;
        });

    });

    ...
}

Here, we use the Bus.Factory.CreateUsingAzureServiceBus static method to create an Azure Service Bus. We also setup the FlightOrder message to be sent to the Azure Topic flight-orders using busFactoryConfig.Message<FlightOrder>(configTopology =>{ configTopology.SetEntityName(flightOrdersTopic);});

Then, just use the provided extension method AddMassTransit to add MassTransit.

// Add MassTransit
services.AddMassTransit(config => 
{
    config.AddBus(provider => azureServiceBus);
});

To send or publish messages, we need to register IPublishEndpoint and ISendEndpointProvider. IPublishEndpoint is used to publish messages and ISendEndpointProvider is used to send messages onto a queue. Registering IBus is optional.

// register MassTransit's IPublishEndpoint, ISendEndpointProvider and IBus which can be used to send and publish messages
services.AddSingleton<IPublishEndpoint>(azureServiceBus);
services.AddSingleton<ISendEndpointProvider>(azureServiceBus);
services.AddSingleton<IBus>(azureServiceBus);

Sending and publishing

We will create two buttons on a web page. One will trigger a publish and the other will trigger a send.

To publish a message onto an Azure topic, it will look something like this.

await _publishEndpoint.Publish<FlightOrder>(new FlightOrder { FlightId = Guid.NewGuid(), OrderId = _random.Next(1, 999) });

To send a message onto an Azure queue, we need to use the GetSendEndpoint method to explicitly specify the queue path. In this case, the queue name we are using here is flight-cancellation.

var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("sb://masstransit-demo-xyz123.servicebus.windows.net/flight-cancellation"));

await sendEndpoint.Send<FlightCancellation>(new FlightCancellation { FlightId = Guid.NewGuid(), CancellationId = _random.Next(1, 999)});

Create the subscriber

Create a regular console app and register MassTransit similar to that from the publisher.

To receive messages, MassTransit makes it easy with the use of the IConsumer interface.

Here's an example to receive and consume a message for the FlightOrder message.

public class FlightPurchasedConsumer : IConsumer<FlightOrder>
{
    private readonly ILogger<FlightPurchasedConsumer> _logger;

    public FlightPurchasedConsumer(ILogger<FlightPurchasedConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<FlightOrder> context)
    {
        _logger.LogInformation($"Order processed: FlightId:{context.Message.FlightId} - OrderId:{context.Message.OrderId}");

        return Task.CompletedTask;
    }
}

Here's an example of how to consume the FlightCancellation message.

public class FlightCancellationConsumer : IConsumer<FlightCancellation>
{
    private readonly ILogger<FlightCancellationConsumer> _logger;

    public FlightCancellationConsumer(ILogger<FlightCancellationConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<Messages.FlightCancellation> context)
    {
        _logger.LogInformation($"Flight Dequeued- FlightId:{context.Message.FlightId} CancellationId: {context.Message.CancellationId}");

        return Task.CompletedTask;
    }
}

Now we need to register MassTransit and configure the SubscriptionEndpoint to listen for messages on the topic and use the ReceiveEndpoint to listen for messages on the queue.

string connectionString =
    "Endpoint=sb://masstransit-demo-xyz1231.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz";

string flightOrdersTopic = "flight-orders";

string subscriptionName = "flight-subscriber";

string queueName = "flight-canellation";


var azureServiceBus = Bus.Factory.CreateUsingAzureServiceBus(busFactoryConfig =>
{
    busFactoryConfig.Message<FlightOrder>(m => { m.SetEntityName(flightOrdersTopic); });

    var host = busFactoryConfig.Host(connectionString, hostConfig => 
    {
        // this is optional
        hostConfig.TransportType = TransportType.AmqpWebSockets;
    });

    // setup Azure topic consumer
    busFactoryConfig.SubscriptionEndpoint<FlightOrder>(host, subscriptionName, configurator =>
    {
        configurator.Consumer<FlightPurchasedConsumer>(provider);
    });


    // setup Azure queue consumer
    busFactoryConfig.ReceiveEndpoint(host, queueName, configurator =>
    {
        configurator.Consumer<FlightQueueConsumer>(provider);
    });
});

Now, any messages placed onto the Azure Topic or Azure queue, the relevant Consumer will be instantiated by MassTransit and then can be handled.

Summary

This was a brief intro on how to use and configure MassTransit to implement message-based applications. Some basic code examples were also provided to demonstrate sending messages on a topic as well as queue and receiving messages via a console application.

Full code can be found on my GitHub repository

Newsletter

Receive new content like this and more. No spam. Ever. Unsubscribe whenever you want.

or subscribe via RSS with Feedly!