...
Code Block | ||
---|---|---|
| ||
public async Task ProcessMessagesAsync(IMessageBatchContext batch, CancellationToken token) { // iterate through each message context and process the message. Remember that // you should be handling exceptions within your loop and setting each message's // state. If you throw an exception out of this method, then theall messages OnBatchErrorin methodthe // batch will be callederrored. await batch.Contexts.ForEachAsync(async messageContext => { if(token.IsCancellationRequested) return; try { // your business logic here... await Task.Delay(500, token).ConfigureAwait(false); // ** you *must* mark your context as successfully processed ** context.MarkSuccessfullyProcessedForThisDevice(messageContext); } catch(OperationCanceledException) { // exit when the device/channel is stopped return; } catch(Exception ex) { // message moved to the error queue when you add an error event messageContext.WriteEvent(EventSeverity.Error, ex); } }, 10).ConfigureAwait(false); } |
...