Batch Processing

Starting in version 15.5, batch processing support has been added. Batch processing can be useful in scenarios where you need to aggregate a number of messages prior to performing an operation, or, to allow parallel processing of messages.

The batch processing interface is new in 15.1/16.0 and subject to change.

Requirements

There are two requirements to enable batch processing:

  1. Change the queue operation to 'batch' mode.
  2. Implement the IBatchDevice interface in your custom device
  public interface IBatchDevice
  {
    /// <summary>
    /// This method is called by the channel when 1, or more messages are ready to be processed by a device.
    /// The device can decide the order, and threading policy, the messages are processed in.
    /// Batch processing must be enabled on the Queue.
    /// Implementors should also implement ProcessMessageAsync to allow for when the
    /// Batch processing is disabled on the Queue.
    /// </summary>
    /// <param name="batchContext">The context contain the message to be processed</param>
    /// <param name="cancellationToken">A token that can be checked to see if the channel has been stopped</param>
    Task ProcessMessagesAsync(IMessageBatchContext batchContext, CancellationToken cancellationToken);
  }

Limitations

You are limited to processing the number of messages returned by the queue's dequeue method. This is lesser of the 'Maximum Batch Count' (default 100, maximum 1000) and the 'Maximum Batch Size' (default 20MB). If you require batching of larger blocks of messages, a different mechanism will be required.

Implementing

First, implement the standard ProcessMessageAsync method and either implement a default behavior which will be used when the queue is in non-batch mode, or, throw an exception to indicate your device will only operate in batch mode.

    // Always implement the typical ProcessMessageAsync method in case the queue is operating
    // in the regular mode.
	public override async Task ProcessMessageAsync(IMessageContext context, CancellationToken token)
    {
      // -- synchronous implementation
      // await myService.PostMessageAsync(context.Message.GetAsHL7Message()).ConfigureAwait(false);
      // -- or throw an exception. If you implement the OnError method and set Retry = true, you can set the device
      // -- into retry mode which will cause messages to remain in the queued state and errors to accrue on this
      // -- device. By default, messages will be moved to the error queue.
      throw new NotSupportedException("This device can only operate with the queue in Batch mode.");
    }

If you only support batch mode, then you will want to implement the OnError method and stop messages from processing altogether:

    public override void OnError(IMessageContext context, Connexion.Core.ErrorEventArgs args)
    {
      if(args.Exception is NotSupportedException)
      {
        args.ShouldRetry = true;
        args.SleepTime = TimeSpan.FromMinutes(2);
        Logger.Write(EventSeverity.Error, 54321, args.Exception.Message);
      }
    }

Using the implementation above, messages will not be moved to the error queue. They will remain in the queued state and errors will be logged to your device.

Batch Operations

It's important to understand that implementing batch logic is more complex than the standard 'message-at-a-time' implementation - specifically when it comes to exception handling. When operating on a batch, it's possible to have part of a batch succeed and part of it fail. This allows partially successful batches to continue processing on downstream devices.

If you wish your batch of messages to either succeed or fail (no partial success), then you will implement 'All or None' logic. For example, if you are adding all messages within your batch into an archive in a single operation, and if there is any failure, all messages within the batch should be errored.

If your batch logic allows for some messages within your batch to succeed (when others may fail), then you are operating in 'Partial Success' mode. For example, you may decide to send your messages to a downstream system in parallel, and that system may return either success or reject on a per-message basis.

Batch Error Handling

The non-batch processing logic utilizes an OnError method which lets you easily inject error-handling. Batch processing does not have an equivalent method. You must provide your own error-handling within your method(s). These are the cases you may need to handle:

  1. On an exception, error the entire batch: Throwing an exception from the ProcessMessagesAsync method will mark all messages in the batch as errored.
  2. On an exception, retry: When sending to a downstream system, you wish to continuously retry and never error any messages (for example, sending to a downstream web service which may be offline).
  3. On an exception, fail individual messages: You may wish to move specific messages to the error queue. For example, a downstream system may accept/reject a subset of batch messages.

Batch Logic 'All or None'

In this scenario, you wish the entire batch to succeed or fail. For example, if you are adding messages to an archive, and an exception is thrown, all messages within the batch should be errored.

When deciding on an error-handling strategy, determine if exceptions will be transient or not. For example, if your logic depends on an external service which may be periodically offline, you should wrap error-handling logic around the code which sends to your service and never throw. If your logic inspects message structure and does not depend on any external services, then retrying a failing message will always produce an exception. In this case, the message should not be retried and instead moved to the error queue.

If you have determined that an exception should move messages to the error queue (no retry), then simply throw an exception from the ProcessMessagesAsync method.

Note that in both uses of batching (All or None, Partial Success), you must mark each IMessageContext object within the batch as successful. This is done by calling IMessageBatchContext.MarkSuccessfullyProcessedForThisDevice(IMessageContext). An extension method exists to allow you to call this for each context within a batch.


// When an exception is thrown (or bubbles up), all messages in the batch will be moved to the error queue
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
    // All messages within the batch will be moved to the error queue.
    // Each message processing history will contain the exception thrown.
    throw new DivideByZeroException("Logic error!");
}

If you wish to implement retry logic, then wrap the retryable section of logic:

// exceptions thrown outside of the while loop will cause messages to be errored. Exceptions within the while loop will cause a delayed retry.
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
      // your logic here...    

	  // while the channel/device is running, attempt to send the current batch (infinitely). Messages are
	  // never moved to the error queue.
      while (!cancellationToken.IsCancellationRequested)
      {
		try
		{
	      	var result = await SubmitToExternalService(someStream, cancellationToken).ConfigureAwait(false);
			// ... your code ...
			sendAttempt = 0;

			// ** you *must* mark your batch as processed **
	        batch.MarkBatchSuccessful();	// extension method in R11 and later
			// or
			batch.GetMessageContexts().ForEach(batchContext.MarkSuccessfullyProcessedForThisDevice);

            break;
		}
		catch(OperationCanceledException)
   	    {
	  		// exit when the device/channel is stopped
	        throw;
	    }
        catch(exception ex)
		{
        	// failure - retry
            var delay = TimeSpan.FromSeconds(10);
            if (sendAttempt > 3)
            	delay = TimeSpan.FromSeconds(60);
            else if (sendAttempt > 10)
            	delay = TimeSpan.FromMinutes(5);

			// this will record the exception in the event log (visible within the UI)
            Logger.Write(EventSeverity.Error, $"Failed to send batch to {someTarget}: {ex.Message}. Retry in {delay.TotalSeconds}s.");
            
			// wait before retrying
			await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
            sendAttempt++;
        }
	}
}

In many cases, you will implement both mechanisms. You may be preparing a batch of messages for sending to an external system followed by sending to a service. Any exception thrown within the preparation phase should be allowed to bubble up (or re-thrown by you), causing the messages to be errored (since retrying that logic will always result in an error). However, your send phase will be wrapped in retry logic and will either process the messages successfully or not at all.

Batch Logic 'Partial Success'

In some cases you may wish to allow some messages within a batch to continue processing (or be marked as processed) while others are errored. A typical example of this would be processing messages in parallel. This type of processing requires an addition flag to be set on each message context.

  • To mark an individual message as successfully processed, you must call IMessageBatchContext.MarkSuccessfullyProcessedForThisDevice(messageContext), using the IMessageBatchContext object passed into the ProcessMessagesAsync method. Using this method will enable batch processing to be cancelled mid-processing, and all processed messages will be moved to the processed queue state, while unprocessed messages will remain in the queued state.
  • To mark an individual message as errored, add an error event via messageContext.WriteEvent(EventSeverity.Error, "...").
  • To mark an individual message as filtered, use the messageContext.FilterMessage(...)
  • To mark an individual message as cancelled (message remains in the queued state), set the messageContext.Cancel to true.

The following example loops through each message context within the batch. If the cancellation token has been set (by stopping the channel or pausing the device), then any non-processed messages are returned to the queued state. Any messages which have been successfully processed (or errored/filtered) will be filed into the associated queue.

public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken token)
{
  // iterate through each message context and process the message. Remember that
  // you should be handling exceptions within your loop and setting each message's
  // state. If you throw an exception out of this method, then all messages in the
  // batch will be errored.
  await batch.Contexts.ForEachAsync(async messageContext =>
  {
    if(token.IsCancellationRequested)
      return;
        
    try
    {
      // your business logic here...
      await Task.Delay(500, token).ConfigureAwait(false);          
           
      // ** you *must* mark your context as successfully processed **
      context.MarkSuccessfullyProcessedForThisDevice(messageContext);
    }
    catch(OperationCanceledException)
    {
	  // exit when the device/channel is stopped
      return;
    }
    catch(Exception ex)
    {
      // message moved to the error queue when you add an error event
      messageContext.WriteEvent(EventSeverity.Error, ex);
    }
  }, 10).ConfigureAwait(false);
}

Note that in the example above, messages are being processed in parallel. In this scenario, it's possible for some messages to succeed and some messages to fail. Individual messages that do not succeed will either be moved to the error queue, or, retried on a subsequent attempt.