Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current View Version History

« Previous Version 5 Next »

Starting in version 15.1, batch processing support has been added. Batch processing can be useful in scenarios where you need to aggregate a number of messages prior to performing an operation, or, to allow parallel processing of messages.

The batch processing interface is new in 15.1 and subject to change.

Requirements

There are two requirements to enable batch processing:

  1. Change the queue operation to 'batch' mode.
  2. Implement the IBatchDevice interface in your custom device
  public interface IBatchDevice
  {
    /// <summary>
    /// This method is called by the channel when 1, or more messages are ready to be processed by a device.
    /// The device can decide the order, and threading policy, the messages are processed in.
    /// Batch processing must be enabled on the Queue.
    /// Implementors should also implement ProcessMessageAsync to allow for when the
    /// Batch processing is disabled on the Queue.
    /// </summary>
    /// <param name="batchContext">The context contain the message to be processed</param>
    /// <param name="cancellationToken">A token that can be checked to see if the channel has been stopped</param>
    Task ProcessMessagesAsync(IMessageBatchContext batchContext, CancellationToken cancellationToken);
  }

Limitations

You are limited to processing the number of messages returned by the queue's dequeue method. This is lesser of the 'Maximum Batch Count' (default 100, maximum 1000) and the 'Maximum Batch Size' (default 20MB). If you require batching of larger blocks of messages, a different mechanism will be required.

Implementing

First, implement the standard ProcessMessageAsync method and either implement a default behavior which will be used when the queue is in non-batch mode, or, throw an exception to indicate your device will only operate in batch mode.

    // Always implement the typical ProcessMessageAsync method in case the queue is operating
    // in the regular mode.
	public override async Task ProcessMessageAsync(IMessageContext context, CancellationToken token)
    {
      // -- synchronous implementation
      // await myService.PostMessageAsync(context.Message.GetAsHL7Message()).ConfigureAwait(false);
      // -- or throw an exception. If you implement the OnError method and set Retry = true, you can set the device
      // -- into retry mode which will cause messages to remain in the queued state and errors to accrue on this
      // -- device. By default, messages will be moved to the error queue.
      throw new NotSupportedException("This device can only operate with the queue in Batch mode.");
    }

If you only support batch mode, then you will want to implement the OnError method and stop messages from processing altogether:

    public override void OnError(IMessageContext context, Connexion.Core.ErrorEventArgs args)
    {
      if(args.Exception is NotSupportedException)
      {
        args.ShouldRetry = true;
        args.SleepTime = TimeSpan.FromMinutes(2);
        Logger.Write(EventSeverity.Error, 54321, args.Exception.Message);
      }
    }

Using the implementation above, messages will not be moved to the error queue. They will remain in the queued state and errors will be logged to your device.

Next, implement your batch processing method. It is important to note that the message handling behavior within batch processing differs from the regular ProcessMessageAsync method.

You must update each individual message context within a batch. You must mark each message context as:

(a) Errored: messageContext.WriteEvent(EventSeverity.Error, "my error");

(b) Filtered: messageContext.FilterMessage(...);

(c) Processed: Set the messageContext.BatchProcessingSuccess = true after you message has successfully processed.

public async Task ProcessMessagesAsync(IMessageBatchContext context, CancellationToken token)
{
  // iterate through each message context and process the message. Remember that
  // you should be handling exceptions within your loop and setting each message's
  // state. If you throw an exception out of this method, then the OnBatchError method
  // will be called.
  await context.Contexts.ForEachAsync(async messageContext =>
  {
    if(token.IsCancellationRequested)
      return;
        
    try
    {
      // your business logic here...
      await Task.Delay(500, token).ConfigureAwait(false);          
           
      // ** you *must* mark your context as successfully processed **
      messageContext.BatchProcessingSuccess = true;
    }
    catch(OperationCanceledException)
    {
      return;
    }
    catch(Exception ex)
    {
      // message moved to the error queue when you add an error event
      messageContext.WriteEvent(EventSeverity.Error, ex);
    }
  }, 10).ConfigureAwait(false);
}


  • No labels