Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Starting in version 15.15, batch processing support has been added. Batch processing can be useful in scenarios where you need to aggregate a number of messages prior to performing an operation, or, to allow parallel processing of messages.

Info
iconfalse
The batch processing interface is new in 15.1/15.5 and subject to change.

Requirements

...

(b) Filtered: messageContext.FilterMessage(...);

(c) Processed: Set the messageContext.BatchProcessingSuccess = true context.MarkSuccessfullyProcessedForThisDevice(messageContext) after your message has successfully processed. [IBatchDevice.MarkSuccessfullyProcessedForThisDevice()]. Using this method will enable batch processing to be cancelled mid-processing, and all processed messages will be moved to the processed queue state, while unprocessed messages will remain in the queued state. If you are processing the entire batch of messages in one go, this is not necessary (and a failure will result in the entire batch being retried).

The following example loops through each message context within the batch. If the cancellation token has been set (by stopping the channel or pausing the device), then any non-processed messages are returned to the queued state. Any messages which have been successfully processed (or errored/filtered) will be filed into the associated queue.

Code Block
languagec#
public async Task ProcessMessagesAsync(IMessageBatchContext context, CancellationToken token)
{
  // iterate through each message context and process the message. Remember that
  // you should be handling exceptions within your loop and setting each message's
  // state. If you throw an exception out of this method, then the OnBatchError method
  // will be called.
  await context.Contexts.ForEachAsync(async messageContext =>
  {
    if(token.IsCancellationRequested)
      return;
        
    try
    {
      // your business logic here...
      await Task.Delay(500, token).ConfigureAwait(false);          
           
      // ** you *must* mark your context as successfully processed **
      messageContext.BatchProcessingSuccess = truecontext.MarkSuccessfullyProcessedForThisDevice(messageContext);
    }
    catch(OperationCanceledException)
    {
	  // exit when the device/channel is stopped
      return;
    }
    catch(Exception ex)
    {
      // message moved to the error queue when you add an error event
      messageContext.WriteEvent(EventSeverity.Error, ex);
    }
  }, 10).ConfigureAwait(false);
}

...