Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current View Version History

« Previous Version 8 Next »

Starting in version 15.1, batch processing support has been added. Batch processing can be useful in scenarios where you need to aggregate a number of messages prior to performing an operation, or, to allow parallel processing of messages.

The batch processing interface is new in 15.1 and subject to change.

Requirements

There are two requirements to enable batch processing:

  1. Change the queue operation to 'batch' mode.
  2. Implement the IBatchDevice interface in your custom device
  public interface IBatchDevice
  {
    /// <summary>
    /// This method is called by the channel when 1, or more messages are ready to be processed by a device.
    /// The device can decide the order, and threading policy, the messages are processed in.
    /// Batch processing must be enabled on the Queue.
    /// Implementors should also implement ProcessMessageAsync to allow for when the
    /// Batch processing is disabled on the Queue.
    /// </summary>
    /// <param name="batchContext">The context contain the message to be processed</param>
    /// <param name="cancellationToken">A token that can be checked to see if the channel has been stopped</param>
    Task ProcessMessagesAsync(IMessageBatchContext batchContext, CancellationToken cancellationToken);
  }

Limitations

You are limited to processing the number of messages returned by the queue's dequeue method. This is lesser of the 'Maximum Batch Count' (default 100, maximum 1000) and the 'Maximum Batch Size' (default 20MB). If you require batching of larger blocks of messages, a different mechanism will be required.

Implementing

First, implement the standard ProcessMessageAsync method and either implement a default behavior which will be used when the queue is in non-batch mode, or, throw an exception to indicate your device will only operate in batch mode.

    // Always implement the typical ProcessMessageAsync method in case the queue is operating
    // in the regular mode.
	public override async Task ProcessMessageAsync(IMessageContext context, CancellationToken token)
    {
      // -- synchronous implementation
      // await myService.PostMessageAsync(context.Message.GetAsHL7Message()).ConfigureAwait(false);
      // -- or throw an exception. If you implement the OnError method and set Retry = true, you can set the device
      // -- into retry mode which will cause messages to remain in the queued state and errors to accrue on this
      // -- device. By default, messages will be moved to the error queue.
      throw new NotSupportedException("This device can only operate with the queue in Batch mode.");
    }

If you only support batch mode, then you will want to implement the OnError method and stop messages from processing altogether:

    public override void OnError(IMessageContext context, Connexion.Core.ErrorEventArgs args)
    {
      if(args.Exception is NotSupportedException)
      {
        args.ShouldRetry = true;
        args.SleepTime = TimeSpan.FromMinutes(2);
        Logger.Write(EventSeverity.Error, 54321, args.Exception.Message);
      }
    }

Using the implementation above, messages will not be moved to the error queue. They will remain in the queued state and errors will be logged to your device.

Next, implement your batch processing method. It is important to note that the message handling behavior within batch processing differs from the regular ProcessMessageAsync method.

If you are processing the entire batch of messages within one method (for example, sending the entire batch to an external service), then you can follow the same pattern as the non-batch ProcessMessageAsync method. Exceptions thrown from this method will be passed to the OnBatchError method and you can choose to either retry the entire batch, or, error the entire batch.

However, if you are processing messages individually, or, in smaller batches, you must update each individual message context within a batch. You must mark each message context as:

(a) Errored: messageContext.WriteEvent(EventSeverity.Error, "my error");

(b) Filtered: messageContext.FilterMessage(...);

(c) Processed: Set the messageContext.BatchProcessingSuccess = true after your message has successfully processed.

The following example loops through each message context within the batch. If the cancellation token has been set (by stopping the channel or pausing the device), then any non-processed messages are returned to the queued state. Any messages which have been successfully processed (or errored/filtered) will be filed into the associated queue.

public async Task ProcessMessagesAsync(IMessageBatchContext context, CancellationToken token)
{
  // iterate through each message context and process the message. Remember that
  // you should be handling exceptions within your loop and setting each message's
  // state. If you throw an exception out of this method, then the OnBatchError method
  // will be called.
  await context.Contexts.ForEachAsync(async messageContext =>
  {
    if(token.IsCancellationRequested)
      return;
        
    try
    {
      // your business logic here...
      await Task.Delay(500, token).ConfigureAwait(false);          
           
      // ** you *must* mark your context as successfully processed **
      messageContext.BatchProcessingSuccess = true;
    }
    catch(OperationCanceledException)
    {
	  // exit when the device/channel is stopped
      return;
    }
    catch(Exception ex)
    {
      // message moved to the error queue when you add an error event
      messageContext.WriteEvent(EventSeverity.Error, ex);
    }
  }, 10).ConfigureAwait(false);
}
// Called when an exception is thrown from the ProcessMessagesAsync method.
// *This only applies when you are processing the entire batch in one go. If you
// *are processing messages within the batch individually (or in smaller batches,
// *you need to handle exceptions within the ProcessMessagesAsync method.
public override void OnBatchError(IMessageBatchContext context, Connexion.Core.ErrorEventArgs args)
{
  if(args.Exception is OperationCanceledException)
    return;	// stopped device/channel
  
  args.ShouldRetry = true;
  
  args.SleepTime = TimeSpan.FromMinutes(2);
  // this error is displayed over your device
  Logger.Write(EventSeverity.Error, 54321, args.Exception.Message);
}
  • No labels