Starting in version 15.5, batch processing support has been added. Batch processing can be useful in scenarios where you need to aggregate a number of messages prior to performing an operation, or, to allow parallel processing of messages.
Requirements
There are two requirements to enable batch processing:
- Change the queue operation to 'batch' mode.
- Implement the IBatchDevice interface in your custom device
public interface IBatchDevice { /// <summary> /// This method is called by the channel when 1, or more messages are ready to be processed by a device. /// The device can decide the order, and threading policy, the messages are processed in. /// Batch processing must be enabled on the Queue. /// Implementors should also implement ProcessMessageAsync to allow for when the /// Batch processing is disabled on the Queue. /// </summary> /// <param name="batchContext">The context contain the message to be processed</param> /// <param name="cancellationToken">A token that can be checked to see if the channel has been stopped</param> Task ProcessMessagesAsync(IMessageBatchContext batchContext, CancellationToken cancellationToken); }
Limitations
You are limited to processing the number of messages returned by the queue's dequeue method. This is lesser of the 'Maximum Batch Count' (default 100, maximum 1000) and the 'Maximum Batch Size' (default 20MB). If you require batching of larger blocks of messages, a different mechanism will be required.
Implementing
First, implement the standard ProcessMessageAsync method and either implement a default behavior which will be used when the queue is in non-batch mode, or, throw an exception to indicate your device will only operate in batch mode.
// Always implement the typical ProcessMessageAsync method in case the queue is operating // in the regular mode. public override async Task ProcessMessageAsync(IMessageContext context, CancellationToken token) { // -- synchronous implementation // await myService.PostMessageAsync(context.Message.GetAsHL7Message()).ConfigureAwait(false); // -- or throw an exception. If you implement the OnError method and set Retry = true, you can set the device // -- into retry mode which will cause messages to remain in the queued state and errors to accrue on this // -- device. By default, messages will be moved to the error queue. throw new NotSupportedException("This device can only operate with the queue in Batch mode."); }
If you only support batch mode, then you will want to implement the OnError method and stop messages from processing altogether:
public override void OnError(IMessageContext context, Connexion.Core.ErrorEventArgs args) { if(args.Exception is NotSupportedException) { args.ShouldRetry = true; args.SleepTime = TimeSpan.FromMinutes(2); Logger.Write(EventSeverity.Error, 54321, args.Exception.Message); } }
Using the implementation above, messages will not be moved to the error queue. They will remain in the queued state and errors will be logged to your device.
Next, implement your batch processing method. It is important to note that the message handling behavior within batch processing differs from the regular ProcessMessageAsync method.
If you are processing the entire batch of messages within one method (for example, sending the entire batch to an external service), then you can follow the same pattern as the non-batch ProcessMessageAsync method. Exceptions thrown from this method will be passed to the OnBatchError method and you can choose to either retry the entire batch, or, error the entire batch.
However, if you are processing messages individually, or, in smaller batches, you must update each individual message context within a batch. You must mark each message context as:
(a) Errored: messageContext.WriteEvent(EventSeverity.Error, "my error");
(b) Filtered: messageContext.FilterMessage(...);
(c) Processed: Set the context.MarkSuccessfullyProcessedForThisDevice(messageContext) after your message has successfully processed. [IBatchDevice.MarkSuccessfullyProcessedForThisDevice()]. Using this method will enable batch processing to be cancelled mid-processing, and all processed messages will be moved to the processed queue state, while unprocessed messages will remain in the queued state. If you are processing the entire batch of messages in one go, this is not necessary (and a failure will result in the entire batch being retried).
The following example loops through each message context within the batch. If the cancellation token has been set (by stopping the channel or pausing the device), then any non-processed messages are returned to the queued state. Any messages which have been successfully processed (or errored/filtered) will be filed into the associated queue.
public async Task ProcessMessagesAsync(IMessageBatchContext context, CancellationToken token) { // iterate through each message context and process the message. Remember that // you should be handling exceptions within your loop and setting each message's // state. If you throw an exception out of this method, then the OnBatchError method // will be called. await context.Contexts.ForEachAsync(async messageContext => { if(token.IsCancellationRequested) return; try { // your business logic here... await Task.Delay(500, token).ConfigureAwait(false); // ** you *must* mark your context as successfully processed ** context.MarkSuccessfullyProcessedForThisDevice(messageContext); } catch(OperationCanceledException) { // exit when the device/channel is stopped return; } catch(Exception ex) { // message moved to the error queue when you add an error event messageContext.WriteEvent(EventSeverity.Error, ex); } }, 10).ConfigureAwait(false); }
Note that in the example above, messages are being processed in parallel. In this scenario, it's possible for some messages to succeed and some messages to fail. Individual messages that do not succeed will either be moved to the error queue, or, retried on a subsequent attempt. Another batch processing case involves the entire batch of messages succeeding or failing as a single batch. An example of this would be the sending of a batch of messages to an external service. In this case, you may serialize all the messages in the batch into a single payload, and send that payload. If the payload succeeds, all messages should be successful, and if it does not, the entire batch should be retried.
In this scenario, the context.MarkSuccessfullyProcessedForThisDevice(messageContext) has no effect, and is not required. Messages will be marked as processed once the final device has been reached (without errors).
Please note - there is no built-in retry or error handling policy - it is up to you to implement retry logic and error handling. Here is an example showing a retry policy:
public async Task ProcessMessagesAsync(IMessageBatchContext batchContext, CancellationToken cancellationToken) { var retry = 0; while (!cancellationToken.IsCancellationRequested) { try { await Task.Delay(1000, cancellationToken); // sample async call - send your batch of messages here } catch (OperationCanceledException) { throw; } catch (Exception ex) { // if this is a non-retryable error, then you should rethrow. retry++; var sleepTimeInSeconds = retry > 5 ? 60 : 10; var errorMessage = $"{ex.Message}\n(failed {retry} time(s), next retry in {sleepTimeInSeconds}s)"; Logger.Write(EventSeverity.Error, errorMessage); RealtimeDeviceDisplayStatus.SetSummaryText(errorMessage, Colors.Orange); await Task.Delay(TimeSpan.FromSeconds(sleepTimeInSeconds), cancellationToken); } } }