Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If you have determined that an exception should move messages to the error queue (no retry), then simply throw an exception from the ProcessMessagesAsync method.

Info
Note that in both uses of batching (All or None, Partial Success), you must mark each IMessageContext object within the batch as successful. This is done by calling IMessageBatchContext.MarkSuccessfullyProcessedForThisDevice(IMessageContext). An extension method exists to allow you to call this for each context within a batch.


Code Block
// When an exception is thrown (or bubbles up), all messages in the batch will be moved to the error queue
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
    // All messages within the batch will be moved to the error queue.
    // Each message processing history will contain the exception thrown.
    throw new DivideByZeroException("Logic error!");
}

If you wish to implement retry logic, then wrap the retryable section of logic:

Code Block
// exceptions thrown outside of the while loop will cause messages to be errored. Exceptions within the while loop will cause a delayed retry.
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
      // your logic here...    

	  // while the channel/device is running, attempt to send the current batch (infinitely). Messages are
	  // never moved to the error queue.
      while (!cancellationToken.IsCancellationRequested)
      {
		try
		{
	      	var result = await SubmitToExternalService(someStream, cancellationToken).ConfigureAwait(false);
			// ... your code ...
			sendAttempt = 0;/ All messages within the batch will be moved to the error queue.
    // Each message processing history will contain the exception thrown.
    throw new DivideByZeroException("Logic error!");
}

If you wish to implement retry logic, then wrap the retryable section of logic:

Code Block
// exceptions thrown outside of the while loop will cause messages to be errored. Exceptions within the while loop will cause a delayed retry.
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
      // your logic here...    

	  // while the channel/device is running, attempt to send the current batch (infinitely). Messages are
	  // never moved to the error queue.
      while (!cancellationToken.IsCancellationRequested)
      {
		try
		{
	      	var result = await SubmitToExternalService(someStream, cancellationToken).ConfigureAwait(false);
			// ... your code ...
			sendAttempt = 0;

			// ** you *must* mark your batch as processed **
	        batch.MarkBatchSuccessful();	// extension method in R11 and later
			// or
			batch.GetMessageContexts().ForEach(batchContext.MarkSuccessfullyProcessedForThisDevice);

            break;
		}
		catch(OperationCanceledException)
   	    {
	  		// exit when the device/channel is stopped
	        throw;
	    }
        catch(exception ex)
		{
        	// failure - retry
            var delay = TimeSpan.FromSeconds(10);
            if (sendAttempt > 3)
            	delay = TimeSpan.FromSeconds(60);
            else if (sendAttempt > 10)
            	delay = TimeSpan.FromMinutes(5);

			// this will record the exception in the event log (visible within the UI)
            Logger.Write(EventSeverity.Error, $"Failed to send batch to {someTarget}: {ex.Message}. Retry in {delay.TotalSeconds}s.");
            
			// wait before retrying
			await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
            sendAttempt++;
        }
	}
}

...

Code Block
languagec#
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken token)
{
  // iterate through each message context and process the message. Remember that
  // you should be handling exceptions within your loop and setting each message's
  // state. If you throw an exception out of this method, then all themessages OnBatchErrorin methodthe
  // batch will be callederrored.
  await batch.Contexts.ForEachAsync(async messageContext =>
  {
    if(token.IsCancellationRequested)
      return;
        
    try
    {
      // your business logic here...
      await Task.Delay(500, token).ConfigureAwait(false);          
           
      // ** you *must* mark your context as successfully processed **
      context.MarkSuccessfullyProcessedForThisDevice(messageContext);
    }
    catch(OperationCanceledException)
    {
	  // exit when the device/channel is stopped
      return;
    }
    catch(Exception ex)
    {
      // message moved to the error queue when you add an error event
      messageContext.WriteEvent(EventSeverity.Error, ex);
    }
  }, 10).ConfigureAwait(false);
}

...