Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Starting in version 15.1, batch processing support has been added. Batch processing can be useful in scenarios where you need to aggregate a number of messages prior to performing an operation, or, to allow parallel processing of messages.

Info
iconfalse
The batch processing interface is new in 15.1 and subject to change.

Requirements

There are two requirements to enable batch processing:

...

Code Block
languagec#
  public interface IBatchDevice
  {
    /// <summary>
    /// This method is called by the channel when 1, or more messages are ready to be processed by a device.
    /// The device can decide the order, and threading policy, the messages are processed in.
    /// Batch processing must be enabled on the Queue.
    /// Implementors should also implement ProcessMessageAsync to allow for when the
    /// Batch processing is disabled on the Queue.
    /// </summary>
    /// <param name="batchContext">The context contain the message to be processed</param>
    /// <param name="cancellationToken">A token that can be checked to see if the channel has been stopped</param>
    Task ProcessMessagesAsync(IMessageBatchContext batchContext, CancellationToken cancellationToken);
  }

...

Limitations

You are limited to processing the number of messages returned by the queue's dequeue method. This is lesser of the 'Maximum Batch Count' (default 100, maximum 1000) and the 'Maximum Batch Size' (default 20MB). If you require batching of larger blocks of messages, a different mechanism will be required.

Implementing

First, implement the standard ProcessMessageAsync method and either implement a default behavior which will be used when the queue is in non-batch mode, or, throw an exception to indicate your device will only operate in batch mode.

Code Block
languagec#
    // Always implement the typical ProcessMessageAsync method in case the queue is operating
    // in the regular mode.
	public override async Task ProcessMessageAsync(IMessageContext context, CancellationToken token)
    {
      // -- synchronous implementation
      // await myService.PostMessageAsync(context.Message.GetAsHL7Message()).ConfigureAwait(false);
      // -- or throw an exception. If you implement the OnError method and set Retry = true, you can set the device
      // -- into retry mode which will cause messages to remain in the queued state and errors to accrue on this
      // -- device. By default, messages will be moved to the error queue.
      throw new NotSupportedException("This device can only operate with the queue in Batch mode.");
    }

If you only support batch mode, then you will want to implement the OnError method and stop messages from processing altogether:

Code Block
languagec#
    public override void OnError(IMessageContext context, Connexion.Core.ErrorEventArgs args)
    {
      if(args.Exception is NotSupportedException)
      {
        args.ShouldRetry = true;
        args.SleepTime = TimeSpan.FromMinutes(2);
        Logger.Write(EventSeverity.Error, 54321, args.Exception.Message);
      }
    }