NServiceBus Sagas are a powerful feature that would allow you model long running processes without managing the state on your own or having to juggle correlating messages. Since they manage the state for you and messaging systems being distributed and things running concurrently, they also do concurrency control via Optimistic Concurrency.

If you have done database work before, you know that optimistic concurrency means if two things happen at the same time, one of them will fail. Armed with NServiceBus retries, we can easily recuperate from the ‘transient’ problems by merely retrying the offending message and eventually get it working since the second time - hopefully - no one has touched the state.

I was reviewing a code where the messages were being retired constantly during a batch import. Imagine importing data in batches via a remote HTTP service and the saga is completed when all the batches are imported successfully. The code looked something like this (skipping the saga signature):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public async Task Handle(ImportData message, IMessageHandlerContext context)
{
Data.PagesToImport = message.PagesToImport;

foreach (var pageNo in Enumerable.Range(1, message.PagesToImport))
{
await context.Send(new ImportDataPage
{
ImportID = Data.ImportID,
PageNo = pageNo
});
}
}

public async Task Handle(ImportDataPage message, IMessageHandlerContext context)
{
await remote.ImportDataPage(message.PageNo); //Import the data from the remoteserver

Data.DataPagesImported++; //Update our local state

await CheckIfAllPagesImported(context);
}

private async Task CheckIfAllPagesImported(IMessageHandlerContext context)
{
if (Data.DataPagesImported == Data.PagesToImport)
{
MarkAsComplete();
}
}

Now the interesting thing that was happenning was because the calls to import the data were running concurrently. On one hand, this was a good thing as the call to get the data was a slow HTTP operation running slowly, but the saga was complaining about the concurrency, as 5 requests running at the same time, would clash if they finish around the same time when they are finished and need to update the saga’s state.

First stab: limiting the concurrency level

So if the saga is complaining about too many messages touching the state at the same time what’s the first thing you’d try to do? Maybe dialing down on the concurrency level? That might make sense, since if we have less messages running at the same time, we’ll have less clashing as well. Doing this is like ‘erasing the problem’ though since the concurreny was desired.

Furthermore, customizing concurrency levels per message type is only possible by customizing the message processing pipeline as NServiceBus does not allow you to do that out of the box - and for good reasons.

Second stab: understanding the real problem

Taking a step back, I had to ask myself a question: why is this a problem? Didn’t we just say NServiceBus retries take care of the concurrency issues? Didn’t all the messages eventually succeed? The answer to that question was key.

The real problem this was causing was not the concurrency or the number of retires. The problem was in the fact that the remote HTTP calls are made again, since the whole message had to get retried. Let’s have a closer look at the code:

1
2
3
await remote.ImportDataPage(message.PageNo); // <- Remote HTTP Operation

Data.DataPagesImported++; // <- Local Database Update

See the problem? The problem lies in the way the transactional and non-transactional resources are mixed. You should never do that.

Capturing the intent

How do we separate the two concerns then? By adding another level of indirection of course. Here it means sending a message to capture the intent rather than doing two things with the same message. This is a very important thing to remember.

Here’s how the final code would look like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public async Task Handle(ImportData message, IMessageHandlerContext context)
{
Data.PagesToImport = message.PagesToImport;

foreach (var pageNo in Enumerable.Range(1, message.PagesToImport))
{
await context.Send(new ImportDataPage
{
ImportID = Data.ImportID,
PageNo = pageNo
});
}

await CheckIfAllPagesImported(context); //If there's no data to import
}

public async Task Handle(ImportDataPage message, IMessageHandlerContext context)
{
await agent.ImportDataPage(message.PageNo);

await context.Publish(new DataPageImported
{
ImportID = Data.ImportID,
});
}

public async Task Handle(DataPageImported message, IMessageHandlerContext context)
{
Data.DataPagesImported++;
await CheckIfAllPagesImported(context);
}

private async Task CheckIfAllPagesImported(IMessageHandlerContext context)
{
if (Data.DataPagesImported == Data.PagesToImport)
{
MarkAsComplete();
}
}

As you can see, the act to update the internal state is separated from the act of fetching the data. Does this means that there would not be any concurrency issues? No. There still will be, but the conflicting message that gets ‘retried’ is only to update the internal state and would not need to rerun the whole import process to do so!