У меня есть потребитель Kafka, который вызывает внешний API при получении сообщения. Polly.NET используется в качестве механизма повторной попытки в случае неудачного вызова.
Проблема с текущим решением заключается в том, что механизм повторной попытки блокирует использование следующего сообщения, поэтому следующее сообщение должно дождаться завершения механизма повторной попытки.
Есть идеи, как я могу запустить механизм повторной попытки асинхронно, чтобы продолжить работу со следующими сообщениями?
Следующий пример демонстрирует описанную проблему:
using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Awaiting retry policy here will block the consumption of next message
var result = await GetRetryPolicy().ExecuteAsync(async () =>
{
// CALL AN API HERE...
return new HttpResponseMessage(System.Net.HttpStatusCode.OK);
});
}
IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}





Чтобы полностью добиться асинхронной обработки без блокировки, вам действительно следует избегать ожидания потребления сообщения.
Вот измененный код:
using Confluent.Kafka;
using Polly;
using Polly.Extensions.Http;
var config = new ConsumerConfig
{
BootstrapServers = "host1:9092,host2:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var retryPolicy = GetRetryPolicy();
while (true)
{
var result = consumer.Consume(); // This is a synchronous call
// Start the asynchronous execution of the retry policy
_ = retryPolicy.ExecuteAsync(async () =>
{
// CALL AN API HERE asynchronously, without awaiting the response...
await YourApiCallAsync(); // Your API call should be asynchronous
// Continue processing or log the result as needed
});
}
}
IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
.WaitAndRetryAsync(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}
В этом коде вызов result = consumer.Consume() не ожидается, что позволяет потребителю Kafka продолжать асинхронную обработку сообщений. Вызов retryPolicy.ExecuteAsync также запускает асинхронное выполнение вашего вызова API, не ожидая ответа, гарантируя, что он не блокирует цикл потребления сообщений.
Конечно, вот более краткое описание: 1) Асинхронное выполнение: асинхронные методы не создают новые потоки, если они не ожидаются; они выполняются асинхронно в контексте одного и того же потока, избегая блокировки основного потока. 2)**Обеспечение асинхронного выполнения**. Политика повтора кода выполняется асинхронно вместе с основным циклом потребления сообщений, обеспечивая параллельную работу. 3) Получение результата Polly.NET. Используйте .ExecuteAndCaptureAsync, чтобы записать результаты повторных попыток Polly.NET, включая успех или неудачу, сведения об исключениях и количество повторных попыток.
Это решение обрабатывает сообщения методом «сделал и забыл». Это означает, что все неудачи проглатываются. Также, скорее всего, получение сообщений происходит намного быстрее, чем обработка сообщений с помощью вызовов внешних служб. Это означает, что ваши смещения намного расширены по сравнению с обработанными сообщениями. В случае сбоя приложения и его перезапуска потребитель Kafka продолжит работу с зафиксированного смещения, а не с последнего обработанного сообщения. В зависимости от варианта использования это может стать огромной проблемой.
Да, действительно, это необходимо учитывать, но в моем случае я не хочу ничего делать, если сообщение полностью не удастся. Это не так критично.
Всякий раз, когда вы переходите из мира синхронного выполнения в асинхронный, возникает множество новых проблем.
Давайте посмотрим пару из них. Самый простой способ сделать обработку асинхронной — это отделить использование тем от обработки сообщений. Обычно это делается таким образом, что существует выделенный поток, который извлекает сообщения из темы Kafka и после десериализации отправляет обработку сообщений в рабочий поток. (Обычно существует пул рабочих потоков, который содержит потоки многократного использования.)
Самая первая проблема, с которой вы здесь столкнетесь, — это фиксация смещений. Когда мне следует это сделать? До отправки или после того, как рабочий поток завершит обработку?
Если мы воспользуемся подходом «до отправки» (то есть смещением автоматической фиксации), то следующая проблема, с которой мы столкнемся, — это обработка ошибок. Что мне делать с теми сообщениями, которые не удалось обработать (включая повторные попытки)? Должен ли я вернуть их в тему (например, переставить в очередь) для последующего использования? Или мне следует поместить это в специальную тему под названием «Очередь недоставленных писем», чтобы по-другому обрабатывать эти сообщения? Или просто отказаться от них?
Если мы воспользуемся подходом после завершения рабочего потока, то у нас возникнет еще одна куча проблем. Потребительское приложение должно поддерживать, где мы находимся с точки зрения обработки. Каково последнее смещение для каждого раздела, где мы уверены, что все сообщения до смещения обработаны? Когда нам следует совершить эти смещения?
Еще одна проблема — упорядочивание сообщений. Представьте, что у вас есть два сообщения (M1 и M2). Предположим, что временная метка M2 больше, чем временная метка M1. Обработка сообщений М2 работала безупречно и быстро. Но обработка M1 была немного неровной. Вызов внешней службы пришлось повторить несколько раз из-за временного сбоя сети. Другими словами, существует момент времени, когда M2 (более новое сообщение) было обработано, а M1 — нет. Какое смещение мне следует совершить в этом случае?
Я хочу сказать, что создать хорошо работающего и правильного асинхронного потребителя не так просто, как создать синхронного. Итак, я бы еще раз проверил, стоит ли обещанная масштабируемость затраченных усилий.
Спасибо за ответ. Еще один вопрос по поводу асинхронности в .NET. Как мы можем гарантировать, что асинхронное выполнение не будет «потеряно». Использование ключевого слова
awaitне создает новую тему? как я могу гарантировать, что повторная попытка будет запущена, пока основная программа продолжит обработку сообщений. И последний вопрос. Как мне получить результат Polly.NET после завершения механизма повторной попытки. Я хочу регистрировать общее количество неудач. Можно ли включить эту информацию в.OnRetryпри составлении политики?