When working on a project using NServiceBus, we came up with an interesting requirement. The messages in the queue should be processed only in certain time frames. The reason is that message handler is making a webservice call and the other party is only available at certain times and system is not even allowed to make a call during the outage window. You can easily achieve this using Bus.Defer feature in version 3.0 of NServiceBus, but that would put the message at the back of the queue and would change the order of the messages which was very important from business perspective in our case. So we had to roll up our sleeves and dig a bit deeper. It turned out NServiceBus is using an ITransport interface to process the messages in the queue. The implementation that comes with NServiceBus is named TransactionalTransport. This is the class that handles the processing threads under the hood, so what if we set the number of processing threads to zero for a while, and when we need to resume processing, set it back to the default value (one or more threads)?

Udi suggests that you shouldn’t actually use this, if possible. Instead send the message to the remote queue and let the other end worry about how and when to process it, but this may not work if you’re working in a locked down environment or your customer would simply play it by their rules.

Okay here’s the code snippet if you’re interested.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public abstract class BaseCommandHandler<T> : IHandleMessages<T>
{
private TimeSpan _wait = TimeSpan.FromSeconds(30);

public TimeSpan DefaultWait
{
get { return _wait; }
set { _wait = value; }
}

public virtual IBus Bus { get; set; }

public virtual ITransport Transport { get; set; }

public virtual void Handle(T message)
{
if (CanProcessMessage())
{
ProcessMessage(message);
}
else
{
PauseProcessing(DefaultWait);
}
}

protected abstract void ProcessMessage(T message);

protected virtual bool CanProcessMessage()
{
return true;
}

private void PauseProcessing(double interval)
{
Transport.PauseProcessing();
Spin(interval);
}

private void Spin(double interval)
{
var timer = new Timer(interval) { Enabled = true };
timer.Elapsed += OnSpinnigFinished;
timer.Start();
}

private void OnSpinnigFinished(object sender, ElapsedEventArgs elapsedEventArgs)
{
var timer = (Timer) sender;
timer.Enabled = false;
timer.Dispose();
Transport.ResumeProcessing();
}
}

public static class TransportExtensions
{
public static void PauseProcessing(this ITransport transport)
{
transport.AbortHandlingCurrentMessage();
transport.ChangeNumberOfWorkerThreads(0);
}

public static void ResumeProcessing(this ITransport transport)
{
transport.ChangeNumberOfWorkerThreads(1);
}
}