The CosmosDB Change Feed
Change feed in Azure Cosmos DB is a persistent record of changes to a container in the order they occur. Change feed support in Azure Cosmos DB works by listening to an Azure Cosmos DB container for any changes. It then outputs the sorted list of documents that were changed in the order in which they were modified. The persisted changes can be processed asynchronously and incrementally, and the output can be distributed across one or more consumers for parallel processing.
Using either a long-running service or an Azure Function we are able to "react" to CosmosDB container changes and handle them as events.
Components of the change feed processor
The change feed processor has four main components:
The monitored container: The monitored container has the data from which the change feed is generated. Any inserts and updates to the monitored container are reflected in the change feed of the container.
The lease container: The lease container acts as state storage and coordinates the processing of the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account.
The compute instance: A compute instance hosts the change feed processor to listen for changes. Depending on the platform, it might be represented by a virtual machine (VM), a Kubernetes pod, an Azure App Service instance, or an actual physical machine. The compute instance has a unique identifier that's called the instance name throughout this article.
The delegate: The delegate is the code that defines what you, the developer, want to do with each batch of changes that the change feed processor reads.
Creating the processor project
The change feed processor is going to be a microservice of its own.
First we are going to create a simple Console app called Dometrain.Cart.Processor
and add then we will change the SDK type to Microsoft.NET.Sdk.Web
.
Then we will give the following structure to our app:
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.Run();
Now we have to add the CosmosDB client Nuget package. This is the same client as we use in the Cart API named
Aspire.Microsoft.Azure.Cosmos
. We will also add a reference to the ServiceDefaults
project.
Let's wire it up in the Program.cs
.
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
builder.AddAzureCosmosClient("cosmosdb");
var app = builder.Build();
app.MapDefaultEndpoints();
app.Run();
We are now ready to create the processor class.
Creating the background consumer
For that we will extend the BackgroundService class and add all our logic in there.
public class ChangeFeedProcessorService : BackgroundService
{
private const string DatabaseId = "cartdb";
private const string SourceContainerId = "carts";
private const string LeaseContainerId = "carts-leases";
private readonly CosmosClient _cosmosClient;
private readonly ILogger<ChangeFeedProcessorService> _logger;
public ChangeFeedProcessorService(CosmosClient cosmosClient, ILogger<ChangeFeedProcessorService> logger)
{
_cosmosClient = cosmosClient;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var database = _cosmosClient.GetDatabase(DatabaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(LeaseContainerId, "/id"), 400);
var leaseContainer = _cosmosClient.GetContainer(DatabaseId, LeaseContainerId);
var changeFeedProcessor = _cosmosClient.GetContainer(DatabaseId, SourceContainerId)
.GetChangeFeedProcessorBuilder<ShoppingCart>(processorName: "cache-processor", onChangesDelegate: HandleChangesAsync)
.WithInstanceName($"cache-processor-{Guid.NewGuid().ToString()}")
.WithLeaseContainer(leaseContainer)
.Build();
_logger.LogInformation("Starting Change Feed Processor");
await changeFeedProcessor.StartAsync();
}
async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ShoppingCart> changes,
CancellationToken cancellationToken)
{
_logger.LogDebug("Started handling changes for lease {LeaseToken}", context.LeaseToken);
_logger.LogDebug("Change Feed request consumed {RequestCharge} RU.", context.Headers.RequestCharge);
_logger.LogDebug("SessionToken {SessionToken}", context.Headers.Session);
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
_logger.LogWarning("Change Feed request took longer than expected. Diagnostics: {@Diagnostics}", context.Diagnostics);
}
foreach (ShoppingCart item in changes)
{
_logger.LogInformation(JsonConvert.SerializeObject(item));
await Task.Delay(10);
}
_logger.LogDebug("Finished handling changes.");
}
}
Finally, to activate it we will need to add the following in the Program.cs:
builder.Services.AddHostedService<ChangeFeedProcessorService>();
Now let's update the AppHost project:
builder.AddProject<Projects.Dometrain_Cart_Processor>("dometrain-cart-processor")
.WithReference(cartDb);
Now let's run it and test it!
A problem
The change feed will currently only notify the processor on an insert or an update. Delete actions won't trigger an event. An update to this is currently in preview that will allow consumers to get delete actions too as well as previews and current versions of the changed document. Since this isn't currently an option we have to think of a workaround.
The simplest way to deal with this is to change ClearAsync from deleting the item to simply storing one where the course id array is empty:
public async Task<bool> ClearAsync(Guid studentId)
{
try
{
var container = _cosmosClient.GetContainer(DatabaseId, ContainerId);
var cart = new ShoppingCart
{
StudentId = studentId,
CourseIds = []
};
var response = await container.UpsertItemAsync(cart);
return response.StatusCode == HttpStatusCode.OK;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
return true;
}
}
And now we can move the Redis Cache logic into the handling method.
Exercise: Adding the Redis logic
It's now your turn to update the system by moving the Redis cache "Read" logic from the Cart API into the change feed processor.