...
If you have determined that an exception should move messages to the error queue (no retry), then simply throw an exception from the ProcessMessagesAsync method.
Info |
---|
Note that in both uses of batching (All or None, Partial Success), you must mark each IMessageContext object within the batch as successful. This is done by calling IMessageBatchContext.MarkSuccessfullyProcessedForThisDevice(IMessageContext) . An extension method exists to allow you to call this for each context within a batch. |
Code Block |
---|
// When an exception is thrown (or bubbles up), all messages in the batch will be moved to the error queue
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
// All messages within the batch will be moved to the error queue.
// Each message processing history will contain the exception thrown.
throw new DivideByZeroException("Logic error!");
} |
If you wish to implement retry logic, then wrap the retryable section of logic:
Code Block |
---|
// exceptions thrown outside of the while loop will cause messages to be errored. Exceptions within the while loop will cause a delayed retry. public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken) { // your logic here... // while the channel/device is running, attempt to send the current batch (infinitely). Messages are // never moved to the error queue. while (!cancellationToken.IsCancellationRequested) { try { var result = await SubmitToExternalService(someStream, cancellationToken).ConfigureAwait(false); // ... your code ... sendAttempt = 0;/ All messages within the batch will be moved to the error queue. // Each message processing history will contain the exception thrown. throw new DivideByZeroException("Logic error!"); } |
If you wish to implement retry logic, then wrap the retryable section of logic:
Code Block |
---|
// exceptions thrown outside of the while loop will cause messages to be errored. Exceptions within the while loop will cause a delayed retry.
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken)
{
// your logic here...
// while the channel/device is running, attempt to send the current batch (infinitely). Messages are
// never moved to the error queue.
while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await SubmitToExternalService(someStream, cancellationToken).ConfigureAwait(false);
// ... your code ...
sendAttempt = 0;
// ** you *must* mark your batch as processed **
batch.MarkBatchSuccessful(); // extension method in R11 and later
// or
batch.GetMessageContexts().ForEach(batchContext.MarkSuccessfullyProcessedForThisDevice);
break;
}
catch(OperationCanceledException)
{
// exit when the device/channel is stopped
throw;
}
catch(exception ex)
{
// failure - retry
var delay = TimeSpan.FromSeconds(10);
if (sendAttempt > 3)
delay = TimeSpan.FromSeconds(60);
else if (sendAttempt > 10)
delay = TimeSpan.FromMinutes(5);
// this will record the exception in the event log (visible within the UI)
Logger.Write(EventSeverity.Error, $"Failed to send batch to {someTarget}: {ex.Message}. Retry in {delay.TotalSeconds}s.");
// wait before retrying
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
sendAttempt++;
}
}
} |
...
Code Block | ||
---|---|---|
| ||
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken token) { // iterate through each message context and process the message. Remember that // you should be handling exceptions within your loop and setting each message's // state. If you throw an exception out of this method, then all themessages OnBatchErrorin methodthe // batch will be callederrored. await batch.Contexts.ForEachAsync(async messageContext => { if(token.IsCancellationRequested) return; try { // your business logic here... await Task.Delay(500, token).ConfigureAwait(false); // ** you *must* mark your context as successfully processed ** context.MarkSuccessfullyProcessedForThisDevice(messageContext); } catch(OperationCanceledException) { // exit when the device/channel is stopped return; } catch(Exception ex) { // message moved to the error queue when you add an error event messageContext.WriteEvent(EventSeverity.Error, ex); } }, 10).ConfigureAwait(false); } |
...