Atc.Cosmos - Azure Cosmos DB with A Touch of Class
For the past 6 years, I have been using Azure Cosmos DB as my go-to data store. Document databases make so much more sense for the things that I have been building over the past 6 years. The library Atc.Cosmos is the result of years of collective experience solving problems using the same patterns. Atc.Cosmos is a library for configuring containers in Azure Cosmos DB and provides easy, efficient, and convenient ways to read and write document resources.
Using Atc.Cosmos
Here’s an example usage of Atc.Cosmos in a Minimal API project targeting .NET 7.0
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.ConfigureCosmosDb();
var app = builder.Build();
app.MapGet(
"/foo",
(
ICosmosReader<FooResource> reader,
CancellationToken cancellationToken) =>
reader
.ReadAllAsync(FooResource.PartitionKey, cancellationToken)
.ToBlockingEnumerable(cancellationToken)
.Select(c => c.Bar))
.WithName("ListFoo")
.WithOpenApi();
app.MapGet(
"/foo/{id}",
async (
ICosmosReader<FooResource> reader,
string id,
CancellationToken cancellationToken) =>
{
var foo = await reader.FindAsync(id, FooResource.PartitionKey, cancellationToken);
return foo is not null ? Results.Ok(foo.Bar) : Results.NotFound(id);
})
.WithName("GetFoo")
.WithOpenApi();
app.MapPost(
"/foo",
async (
ICosmosWriter<FooResource> writer,
[FromBody] Dictionary<string, object> data,
CancellationToken cancellationToken) =>
{
var id = Guid.NewGuid().ToString();
await writer.CreateAsync(
new FooResource
{
Id = id,
Bar = data,
},
cancellationToken);
return Results.CreatedAtRoute("GetFoo", new { id });
})
.WithName("PostFoo")
.WithOpenApi();
app.UseHttpsRedirection();
app.UseSwaggerUI();
app.UseSwagger();
app.Run();
Let’s break that down a bit and start with the IServiceCollection
extension method ConfigureCosmosDb()
.
To use Atc.Cosmos you need to do the following:
- Implement
IConfigureOptions<CosmosOptions>
to configure the database itself - Define Cosmos resource document types by deriving from
CosmosResource
or implementingICosmosResource
- Implement
ICosmosContainerInitialize
to define a CosmosDb container for every Cosmos resource document type
public static class ServiceCollectionExtensions
{
public static void ConfigureCosmosDb(this IServiceCollection services)
{
services.ConfigureOptions<ConfigureCosmosOptions>();
services.ConfigureCosmos(
cosmosBuilder =>
{
cosmosBuilder.AddContainer<FooContainerInitializer, FooResource>("foo");
cosmosBuilder.UseHostedService();
});
}
}
Here’s an example implementation of IConfigureOptions<CosmosOptions>
public class ConfigureCosmosOptions : IConfigureOptions<CosmosOptions>
{
public void Configure(CosmosOptions options)
{
options.UseCosmosEmulator();
options.DatabaseName = "SampleApi";
options.SerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
}
}
Here’s an example implementation of the ICosmosContainerInitializer
interface for creating a container called foo
:
public class FooContainerInitializer : ICosmosContainerInitializer
{
public Task InitializeAsync(
Database database,
CancellationToken cancellationToken) =>
database.CreateContainerIfNotExistsAsync(
new ContainerProperties
{
PartitionKeyPath = "/pk",
Id = "foo",
},
cancellationToken: cancellationToken);
}
Here’s an example Cosmos resource document type called FooResource
that derives from CosmosResource
public class FooResource : CosmosResource
{
public const string PartitionKey = "foo";
public string Id { get; set; } = null!;
public string Pk => PartitionKey;
public Dictionary<string, object> Bar { get; set; } = new Dictionary<string, object>();
protected override string GetDocumentId() => Id;
protected override string GetPartitionKey() => Pk;
}
ICosmosReader< T >
Cosmos DB is very good at point-read operations, and this is cheap to do. The ICosmosReader<T>
interface provides the following methods for point read operations:
Task<T> ReadAsync(
string documentId,
string partitionKey,
CancellationToken cancellationToken = default);
Task<T?> FindAsync(
string documentId,
string partitionKey,
CancellationToken cancellationToken = default);
ReadAsync()
does a point read look-up on the document within the specified partition and throws a CosmosException
with the Status code NotFound if the resource could not be found. FindAsync()
on the other hand will return a null
instance of T
if the resource count not be found
You will notice that the majority of methods exposed in ICosmosReader<T>
require the partition key to be specified. this is because read operations on Azure Cosmos DB are very cheap and efficient as long as you stay within a single partition.
ICosmosReader<T>
provides methods for reading multiple documents out. This can be done by reading all the documents within a partition or running a query against the partition. Here are some methods that do exactly that:
IAsyncEnumerable<T> ReadAllAsync(
string partitionKey,
CancellationToken cancellationToken = default);
IAsyncEnumerable<T> QueryAsync(
QueryDefinition query,
string partitionKey,
CancellationToken cancellationToken = default);
As the name states, ReadAllAsync()
reads all documents from the specified partition and returns an asynchronous stream of individual documents. QueryAsync()
executes a QueryDefinition
against the specified partition.
When working with large partitions, you will most likely want to use paging to read out data so that you can return a response to the consumer of your system as fast as possible. ICosmosReader<T>
provides the following methods for paged queries:
Task<PagedResult<T>> PagedQueryAsync(
QueryDefinition query,
string partitionKey,
int? pageSize,
string? continuationToken = default,
CancellationToken cancellationToken = default);
When working with very large partitions, you might want to parallelize the processing of the documents you read from Cosmos DB, and this can be done by streaming a collection of documents instead of individual ones. ICosmosReader<T>
provides the following methods for batch queries
IAsyncEnumerable<IEnumerable<T>> BatchReadAllAsync(
string partitionKey,
CancellationToken cancellationToken = default);
IAsyncEnumerable<IEnumerable<T>> BatchQueryAsync(
QueryDefinition query,
string partitionKey,
CancellationToken cancellationToken = default);
Cross-partition queries are normally very inefficient, expensive, and slow. Regardless of these facts, there will be times when you will still need them. ICosmosReader<T>
provides the following methods for performing cross-partition read operations. ICosmosReader<T>
provides methods for executing a query, a paged query, or a batch query across multiple partitions
IAsyncEnumerable<T> CrossPartitionQueryAsync(
QueryDefinition query,
CancellationToken cancellationToken = default);
Task<PagedResult<T>> CrossPartitionPagedQueryAsync(
QueryDefinition query,
int? pageSize,
string? continuationToken = default,
CancellationToken cancellationToken = default);
IAsyncEnumerable<IEnumerable<T>> BatchCrossPartitionQueryAsync(
QueryDefinition query,
CancellationToken cancellationToken = default);
ICosmosWriter < T >
There are multiple ways to write to Cosmos DB and my preferred way is to do upserts. This is to create when not exist, otherwise, update. ICosmosWriter<T>
provides methods for simple upsert operations and methods that includes retry attempts.
Task<T> WriteAsync(
T document,
CancellationToken cancellationToken = default);
Task<T> UpdateOrCreateAsync(
Func<T> getDefaultDocument,
Action<T> updateDocument,
int retries = 0,
CancellationToken cancellationToken = default);
Deleting a resource will usually involve knowing what resource to delete. ICosmosWriter<T>
provides methods for deleting a resource that MUST exists and another method that returns true
if the resource was successfully deleted, otherwise false
Task DeleteAsync(
string documentId,
string partitionKey,
CancellationToken cancellationToken = default);
Task<bool> TryDeleteAsync(
string documentId,
string partitionKey,
CancellationToken cancellationToken = default);
Unit Testing
The ICosmosReader<T>
and ICosmosWriter<T>
interfaces can easily be mocked, but there might be cases where you would want to fake it instead. For this purpose, you can use the FakeCosmosReader<T>
or FakeCosmosWriter<T>
classes from the Atc.Cosmos.Testing
namespace contains the following fakes. For convenience, Atc.Cosmos.Testing provides the FakeCosmos<T>
class which fakes both the reader and writer
Based on the example in the beginning of this post, let’s say we have a component called FooService
which can do CRUD operations over the FooResource
public class FooService
{
private readonly ICosmosReader<FooResource> reader;
private readonly ICosmosWriter<FooResource> writer;
public FooService(
ICosmosReader<FooResource> reader,
ICosmosWriter<FooResource> writer)
{
this.reader = reader;
this.writer = writer;
}
public Task<FooResource?> FindAsync(
string id,
CancellationToken cancellationToken = default) =>
reader.FindAsync(id, FooResource.PartitionKey, cancellationToken);
public Task UpsertAsync(
string? id = null,
Dictionary<string, object>? data = null,
CancellationToken cancellationToken = default) =>
writer.UpdateOrCreateAsync(
() => new FooResource { Id = id ?? Guid.NewGuid().ToString() },
resource => resource.Data = data ?? new Dictionary<string, object>(),
retries: 5,
cancellationToken);
}
Using a combination of Atc.Cosmos.Testing
and the Atc.Test library, unit tests using the fakes could look like this:
public class FooServiceTests
{
[Theory]
[AutoNSubstituteData]
public async Task Should_Get_Existing_Data(
[Frozen(Matching.ImplementedInterfaces)] FakeCosmos<FooResource> fakeCosmos,
FooService sut,
FooResource resource)
{
fakeCosmos.Documents.Add(resource);
(await sut.FindAsync(resource.Id)).Should().NotBeNull();
}
[Theory]
[AutoNSubstituteData]
public async Task Should_Create_New_Data(
[Frozen(Matching.ImplementedInterfaces)] FakeCosmos<FooResource> fakeCosmos,
FooService sut,
Dictionary<string, object> data)
{
var count = fakeCosmos.Documents.Count;
await sut.UpsertAsync(data: data);
fakeCosmos.Documents.Should().HaveCount(count + 1);
}
[Theory]
[AutoNSubstituteData]
public async Task Should_Update_Existing_Data(
[Frozen(Matching.ImplementedInterfaces)] FakeCosmos<FooResource> fakeCosmos,
FooService sut,
FooResource resource,
Dictionary<string, object> data)
{
fakeCosmos.Documents.Add(resource);
await sut.UpsertAsync(resource.Id, data);
fakeCosmos
.Documents
.First(c => c.Id == resource.Id)
.Data
.Should()
.BeEquivalentTo(data);
}
}
If you’re interested in the full source code then you can grab it here.