Skip to main content

Adding asynchronous messaging

One of the core pillars of distributed systems are mechanisms of asynchronous messaging. Whether that's a queue, a message broker or an event log, async messaging can completely transform our system's architecture.

How asynchronous messaging fits in our system

There are so many places we could add messaging in our system, but we will try to keep the usage relatively small for the purposes of this workshop.

So where will we use it?

Currently, when a student places an order, we save the order in the database, and then we enroll the student to all the purchased courses synchronously as part of the same request. This can lead to slow requests depending on how many courses we need to enroll the student on.

What we are going to do is turn the enrollment process to async. Instead of enrolling students as part of the same request, we are going to publish a message that an order was placed and then something else in our system will consume that message and enroll the student to all the courses.

The tech

There are multiple technology choices for our use case. In this workshop we'll take a look at how we can do this with RabbitMQ.

RabbitMQ is an open-source message-broker software that implements the Advanced Message Queuing Protocol (AMQP).

We can also use Azure Service bus or AWS SQS/SNS.

Adding RabbitMQ to Aspire

Like any other service, RabbitMQ will first need to be added in the AppHost project of .NET Aspire.

First we need to install the integration:

Aspire.Hosting.RabbitMQ

Then we can add the integration and refer to it on the API project:

var rabbitmq = builder.AddRabbitMQ("rabbitmq")
.WithManagementPlugin();
.WithReference(rabbitmq);

This is the point where we'd normally add Aspire.RabbitMQ.Client to the API but publishing and consuming messages using the RabbitMQ interface directly is a nightmare. Instead, we will be using MassTransit, which wraps all the logic up in a nice package.

The problem is that MassTransit hasn't been updated to support the way Aspire does messaging client registration, so we'll have to use MassTransit's client instead of the Aspire one.

First let's add the MassTransit RabbitMQ package:

MassTransit.RabbitMQ

Then we can register MassTransit in the Program.cs:

builder.Services.AddMassTransit(s =>
{
s.AddConsumers(typeof(Program).Assembly);
s.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(config["ConnectionStrings:rabbitmq"]!));
cfg.ConfigureEndpoints(context);
});
});

Implementing the publisher

MassTransit has a few interfaces to publish messages but given that we will be publishing from a singleton service we will need to use the IBus interface.

First we need to inject it in the OrderService and then create the OrderPlaced message class:

public record OrderPlaced(Guid OrderId, Guid StudentId, IEnumerable<Guid> CourseIds);

Then we need to replace the enrollment with sending a message:

await _bus.Publish(new OrderPlaced(order.Id, order.StudentId, order.CourseIds));

Now we can create the handler!

Implementing the handler

In MassTransit, message handlers are made by created classes that implement the IConsumer interface.

public class OrderPlacedHandler : IConsumer<OrderPlaced>
{
private readonly IEnrollmentService _enrollmentService;

public OrderPlacedHandler(IEnrollmentService enrollmentService)
{
_enrollmentService = enrollmentService;
}

public async Task Consume(ConsumeContext<OrderPlaced> context)
{
var courseIds = context.Message.CourseIds;
var studentId = context.Message.StudentId;

foreach (var courseId in courseIds)
{
await _enrollmentService.EnrollToCourseAsync(studentId, courseId);
}
}
}

And that's it! We now have asynchronous processing in our application!