...
Using the implementation above, messages will not be moved to the error queue. They will remain in the queued state and errors will be logged to your device.
Next, implement your batch processing method. It is important to note that the message handling behavior within batch processing differs from the regular ProcessMessageAsync method.
If you are processing the entire batch of messages within one method (for example, sending the entire batch to an external service), then you can follow the same pattern as the non-batch ProcessMessageAsync method. Exceptions thrown from this method will be passed to the OnBatchError method and you can choose to either retry the entire batch, or, error the entire batch.
However, if you are processing messages individually, or, in smaller batches, you must update each individual message context within a batch. You must mark each message context as:
(a) Errored: messageContext.WriteEvent(EventSeverity.Error, "my error");
(b) Filtered: messageContext.FilterMessage(...);
(c) Processed: Set the context.MarkSuccessfullyProcessedForThisDevice(messageContext) after your message has successfully processed. [IBatchDevice.MarkSuccessfullyProcessedForThisDevice()]. Using this method will enable batch processing to be cancelled mid-processing, and all processed messages will be moved to the processed queue state, while unprocessed messages will remain in the queued state. If you are processing the entire batch of messages in one go, this is not necessary (and a failure will result in the entire batch being retried).
The following example loops through each message context within the batch. If the cancellation token has been set (by stopping the channel or pausing the device), then any non-processed messages are returned to the queued state. Any messages which have been successfully processed (or errored/filtered) will be filed into the associated queue.
...
language | c# |
---|
Batch Operations
It's important to understand that implementing batch logic is more complex than the standard 'message-at-a-time' implementation - specifically when it comes to exception handling. When operating on a batch, it's possible to have part of a batch succeed and part of it fail. This allows partially successful batches to continue processing on downstream devices.
If you wish your batch of messages to either succeed or fail (no partial success), then you will implement 'All or None' logic. For example, if you are adding all messages within your batch into an archive in a single operation, and if there is any failure, all messages within the batch should be errored.
If your batch logic allows for some messages within your batch to succeed (when others may fail), then you are operating in 'Partial Success' mode. For example, you may decide to send your messages to a downstream system in parallel, and that system may return either success or reject on a per-message basis.
Batch Error Handling
The non-batch processing logic utilizes an OnError method which lets you easily inject error-handling. Batch processing does not have an equivalent method. You must provide your own error-handling within your method(s). These are the cases you may need to handle:
- On an exception, error the entire batch: Throwing an exception from the ProcessMessagesAsync method will mark all messages in the batch as errored.
- On an exception, retry: When sending to a downstream system, you wish to continuously retry and never error any messages (for example, sending to a downstream web service which may be offline).
- On an exception, fail individual messages: You may wish to move specific messages to the error queue. For example, a downstream system may accept/reject a subset of batch messages.
Batch Logic 'All or None'
In this scenario, you wish the entire batch to succeed or fail. For example, if you are adding messages to an archive, and an exception is thrown, all messages within the batch should be errored.
Info |
---|
When deciding on an error-handling strategy, determine if exceptions will be transient or not. For example, if your logic depends on an external service which may be periodically offline, you should wrap error-handling logic around the code which sends to your service and never throw. If your logic inspects message structure and does not depend on any external services, then retrying a failing message will always produce an exception. In this case, the message should not be retried and instead moved to the error queue. |
If you have determined that an exception should move messages to the error queue (no retry), then simply throw an exception from the ProcessMessagesAsync method.
Info |
---|
Note that in both uses of batching (All or None, Partial Success), you must mark each IMessageContext object within the batch as successful. This is done by calling IMessageBatchContext.MarkSuccessfullyProcessedForThisDevice(IMessageContext) . An extension method exists to allow you to call this for each context within a batch. |
Code Block |
---|
// When an exception is thrown (or bubbles up), all messages in the batch will be moved to the error queue public async Task ProcessMessagesAsync(IMessageBatchContext contextbatch, CancellationToken tokencancellationToken) { // All messages iteratewithin throughthe eachbatch messagewill contextbe andmoved processto the messageerror queue. Remember that // youEach message shouldprocessing behistory handlingwill exceptionscontain withinthe yourexception loopthrown. and setting each message's throw // state. If you throw an exception out of this method, then the OnBatchError method // will be called. await context.Contexts.ForEachAsync(async messageContext => { if(token.IsCancellationRequested) return; new DivideByZeroException("Logic error!"); } |
If you wish to implement retry logic, then wrap the retryable section of logic:
Code Block |
---|
// exceptions thrown outside of the while loop will cause messages to be errored. Exceptions within the while loop will cause a delayed retry. public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken cancellationToken) { // your logic here... // while the channel/device is tryrunning, attempt to send the {current batch (infinitely). Messages are // never moved yourto businessthe logicerror herequeue... awaitwhile Task.Delay(500, token).ConfigureAwait(false);(!cancellationToken.IsCancellationRequested) { try { var result = await SubmitToExternalService(someStream, cancellationToken).ConfigureAwait(false); // ... your code ... sendAttempt = 0; // ** you *must* mark your contextbatch as successfully processed ** batch.MarkBatchSuccessful(); // extension method in R11 and later // contextor batch.GetMessageContexts().ForEach(batchContext.MarkSuccessfullyProcessedForThisDevice(messageContext); } break; } catch(OperationCanceledException) { // exit when the device/channel is stopped return throw; } catch(Exceptionexception ex) { { // messagefailure moved- toretry the error queue when you add an error event var delay = messageContextTimeSpan.WriteEvent(EventSeverity.Error, exFromSeconds(10); } }, 10).ConfigureAwait(false); } |
Note that in the example above, messages are being processed in parallel. In this scenario, it's possible for some messages to succeed and some messages to fail. Individual messages that do not succeed will either be moved to the error queue, or, retried on a subsequent attempt. Another batch processing case involves the entire batch of messages succeeding or failing as a single batch. An example of this would be the sending of a batch of messages to an external service. In this case, you may serialize all the messages in the batch into a single payload, and send that payload. If the payload succeeds, all messages should be successful, and if it does not, the entire batch should be retried.
In this scenario, the context.MarkSuccessfullyProcessedForThisDevice(messageContext) has no effect, and is not required. Messages will be marked as processed once the final device has been reached (without errors).
Please note - there is no built-in retry or error handling policy - it is up to you to implement retry logic and error handling. Here is an example showing a retry policy:
Code Block | ||
---|---|---|
| ||
public async Taskif ProcessMessagesAsync(IMessageBatchContextsendAttempt batchContext, CancellationToken cancellationToken> 3) { delay = TimeSpan.FromSeconds(60); var retry = 0; else if (sendAttempt > 10) while (!cancellationToken.IsCancellationRequested) delay = TimeSpan.FromMinutes(5); // this will record {the exception in the event log (visible within the UI) try Logger.Write(EventSeverity.Error, $"Failed to send batch to {someTarget}: {ex.Message}. Retry in {{delay.TotalSeconds}s."); // wait before retrying await Task.Delay(1000delay, cancellationToken).ConfigureAwait(false); // sample async call - send your batch of messages here sendAttempt++; } } } catch (OperationCanceledException) { throw; } catch (Exception ex} |
In many cases, you will implement both mechanisms. You may be preparing a batch of messages for sending to an external system followed by sending to a service. Any exception thrown within the preparation phase should be allowed to bubble up (or re-thrown by you), causing the messages to be errored (since retrying that logic will always result in an error). However, your send phase will be wrapped in retry logic and will either process the messages successfully or not at all.
Batch Logic 'Partial Success'
In some cases you may wish to allow some messages within a batch to continue processing (or be marked as processed) while others are errored. A typical example of this would be processing messages in parallel. This type of processing requires an addition flag to be set on each message context.
- To mark an individual message as successfully processed, you must call IMessageBatchContext.MarkSuccessfullyProcessedForThisDevice(messageContext), using the IMessageBatchContext object passed into the ProcessMessagesAsync method. Using this method will enable batch processing to be cancelled mid-processing, and all processed messages will be moved to the processed queue state, while unprocessed messages will remain in the queued state.
- To mark an individual message as errored, add an error event via messageContext.WriteEvent(EventSeverity.Error, "...").
- To mark an individual message as filtered, use the messageContext.FilterMessage(...)
- To mark an individual message as cancelled (message remains in the queued state), set the messageContext.Cancel to true.
The following example loops through each message context within the batch. If the cancellation token has been set (by stopping the channel or pausing the device), then any non-processed messages are returned to the queued state. Any messages which have been successfully processed (or errored/filtered) will be filed into the associated queue.
Code Block | ||
---|---|---|
| ||
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken token) { // iterate through each message context and process the message. Remember that // you should be handling exceptions within your loop and setting each message's // state. If you throw an exception out of this method, then all messages in the // batch will be errored. await batch.Contexts.ForEachAsync(async messageContext => { if(token.IsCancellationRequested) return; { try { // ifyour thisbusiness is a non-retryable error, then you should rethrowlogic here... await retry++Task.Delay(500, token).ConfigureAwait(false); var sleepTimeInSeconds = retry > 5 ? 60 : 10; // ** you *must* mark your context as successfully processed ** context.MarkSuccessfullyProcessedForThisDevice(messageContext); var} errorMessage = $"{ex.Message}\n(failed {retry} timecatch(sOperationCanceledException), next retry in {sleepTimeInSeconds}s)"; // exit when the device/channel is stopped Logger.Write(EventSeverity.Error, errorMessage)return; } catch(Exception ex) { RealtimeDeviceDisplayStatus.SetSummaryText(errorMessage, Colors.Orange); // message moved to the error queue when you add an error event await Task.Delay(TimeSpan.FromSeconds(sleepTimeInSeconds), cancellationToken); messageContext.WriteEvent(EventSeverity.Error, ex); } }, }10).ConfigureAwait(false); } |
Note that in the example above, messages are being processed in parallel. In this scenario, it's possible for some messages to succeed and some messages to fail. Individual messages that do not succeed will either be moved to the error queue, or, retried on a subsequent attempt.