Я пытаюсь заставить сервисную структуру последовательно извлекать сообщения из концентратора событий Azure. Кажется, у меня все подключено, но я заметил, что мой потребитель просто перестает получать события.
У меня есть хаб с парой тысяч событий, которые я туда отправил. Концентратор сконфигурирован с 1 разделом, а моя служба структуры служб также имеет только 1 раздел для облегчения отладки.
Служба запускается, создает EventHubClient, оттуда использует его для создания PartitionReceiver. Получатель передается в «EventLoop», который входит в «бесконечность», когда вызывает приемник.ReceiveAsync. Код для EventLoop ниже.
Что я наблюдаю, так это то, что в первый раз в цикле я почти всегда получаю 1 сообщение. Во второй раз я получаю где-то между 103 и 200 сообщениями. После этого мне не приходят сообщения. Также кажется, что если я перезапускаю службу, я снова получаю те же сообщения, но это потому, что когда я перезапускаю службу, она запускается с самого начала потока.
Ожидается, что это будет продолжаться до тех пор, пока мои 2000 сообщений не будут израсходованы, а затем он будет ждать меня (иногда опрашивая).
Есть ли что-то конкретное, что мне нужно сделать с пакетом Azure.Messaging.EventHubs 5.3.0, чтобы он продолжал получать события?
//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = "NameOfMyEventHub"
};
try
{
m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}
//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());
//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
{
m_started = true;
while (m_keepRunning)
{
var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
{
var eventsArray = events as EventData[] ?? events.ToArray();
m_state.NumProcessedSinceLastSave += eventsArray.Count();
foreach (var evt in eventsArray)
{
//Process the event
await m_options.Processor.ProcessMessageAsync(evt, null);
string lastOffset = evt.SystemProperties.Offset;
if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
{
m_state.Offset = lastOffset;
m_state.NumProcessedSinceLastSave = 0;
await m_state.SaveAsync();
}
}
}
}
m_started = false;
}
** РЕДАКТИРОВАТЬ, был задан вопрос о количестве разделов. Концентратор событий имеет один раздел, и служба SF также имеет один раздел.
Я намереваюсь использовать состояние сервисной фабрики, чтобы отслеживать мое смещение в концентраторе, но сейчас это не проблема.
Слушатели разделов создаются для каждого раздела. Я получаю такие разделы:
public async Task StartAsync()
{
// slice the pie according to distribution
// this partition can get one or more assigned Event Hub Partition ids
string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);
foreach (var resolvedPartition in resolvedEventHubPartitionIds)
{
var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
await partitionReceiver.StartAsync();
m_partitionReceivers.Add(partitionReceiver);
}
}
Когда вызывается partitionListener.StartAsync, он фактически создает PartitionListener, как это (на самом деле это немного больше, чем это, но взятая ветвь такова:
m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());
Спасибо за любые советы. Воля
Для CloseAsync устанавливается значение false, когда сервисная структура завершает работу службы.
Я не вижу ничего очевидного в коде, которым поделились. Интересно, есть ли ненаблюдаемое исключение, которое вызывает сбой задачи. Цикл не имеет обработки ошибок в версии, которой поделились. Какова стратегия обработки исключений в ProcessMessageAsync
?
>> Есть ли что-то конкретное, что мне нужно сделать с пакетом Azure.Messaging.EventHubs 5.3.0, чтобы он продолжал получать события? Я не уверен, что вы там спрашиваете. Фрагменты, которыми вы поделились, используют устаревший пакет Microsoft.Azure.EventHubs
, а не текущее поколение Azure.Messaging.EventHubs
.
@JesseSquire, это интересное наблюдение. Упомянутый пакет — Azure.Messaging.EventHubs (5.3.0-beta.4), но вы действительно правы, когда я перехожу к определению EventHubClient, он находится в // C:\Users\User\.nuget\packages\ microsoft.azure.eventhubs\4.3.1\lib\netstandard2.0\Microsoft.Azure.EventHubs.dll, хотя этот пакет даже не упоминается в моем проекте. Несомненно, когда-то это было. Позвольте мне разгадать это, и, возможно, проблема разрешится сама собой.
Можете ли вы использовать Service Bus Explorer и сравнить результаты? Интересно, у некоторых сообщений уже истек срок хранения и они не доставлены из-за этого.
Сколько разделов у вас есть? Я не вижу в вашем коде, как вы убедитесь, что читаете все разделы в группе потребителей по умолчанию.
Есть какая-то конкретная причина, по которой вы используете PartitionReceiver
вместо EventProcessorHost?
Мне кажется, что SF идеально подходит для использования хоста обработчика событий. Я вижу, что уже есть интегрированное решение SF, которое использует сервисы с отслеживанием состояния для контрольных точек.
Чтобы ответить на вопрос о разделе, концентратор событий имеет 1 раздел, а служба SF также имеет только один - я думал, что это также может быть виновником, поэтому я исключил это. Я добавлю некоторые из этих деталей в свой основной пост.
Упомянутый пакет интеграции &linked SF использует службу с отслеживанием состояния вместо учетной записи хранения.
Просто отметим, что по приведенной выше ссылке рекомендуется использовать библиотеку текущего поколения, Azure.Messaging.EventHubs.
Ссылка EventProcessorHost
в ответе указывает на правильную библиотеку, но имя типа в этом пакете будет EventProcessorClient
. В этом сценарии наследование от EventProcesor<T> будет лучшим подходом, позволяющим использовать состояние Service Fabric в качестве хранилища контрольных точек.
Спасибо всем, я изучаю эти рекомендации. Должно разобраться.
@JesseSquire Я вижу, вы участвуете в проекте Azure.Messaging.EventHubs. В документе говорится: «Для производственного использования мы рекомендуем использовать клиент обработчика событий». Это ссылка на проект для Azure.Messaging.EventHubs.Processor. Есть ли у кого-нибудь пример использования EventProcessor<T> в Service Fabric? Дополнительное примечание: почему существует так много библиотек для этого материала и как общественность должна знать, какую использовать, не прибегая к переполнению стека?
К сожалению, у нас пока нет хорошего образца для настройки EventProcessor<T>
... лучшим примером на данный момент является EventProcessorClient, который является самоуверенной настройкой поверх него.
Мы рекомендуем использовать семейство пакетов Azure.[[ AREA ]].[[ SERVICE ]]
для новой разработки или нетривиальных обновлений. Что касается мотивов и различий, то по мере того, как Azure росла и была принята более разнообразной группой разработчиков, мы сосредоточились на изучении шаблонов и методов, чтобы наилучшим образом повысить производительность разработчиков и понять пробелы, которые есть в клиентских библиотеках .NET. .
В экосистеме клиентской библиотеки Azure было несколько областей единообразных отзывов. Одним из наиболее важных является то, что клиентские библиотеки для различных служб Azure не имеют единого подхода к организации, именованию и структуре API. Кроме того, многие разработчики считают, что кривая обучения была сложной, а API-интерфейсы не предлагали хорошей, доступной и последовательной истории адаптации для тех, кто изучает Azure или изучает конкретную службу Azure.
Чтобы попытаться улучшить процесс разработки в службах Azure, был создан набор универсальных руководств по проектированию для всех языков, чтобы обеспечить согласованное взаимодействие с установленными шаблонами API для всех служб. Также был представлен набор специфических для .NET руководящих принципов, чтобы гарантировать, что клиенты .NET имеют естественное и идиоматическое ощущение, отражающее поведение библиотек базовых классов .NET. Дополнительный контекст, обоснование и подробности доступны в связанных руководствах.
Всем спасибо. Выполнив это упражнение по обеспечению использования правильных пакетов, я обнаружил проблему. Оказывается, это было в моем глупом модульном тесте, которого не ждали. Тест заканчивался и убивал задачу до того, как все сообщения были на концентраторе, и он постоянно заканчивался после той же самой суммы просто случайно ... Рад быть на новых пакетах, поэтому ваша помощь улучшила ситуацию для меня в целом.
Можете ли вы помочь мне понять, как управляется
m_keepRunning
? Я вижу, что он используется как часть управления циклом, но было бы полезно посмотреть, как приложение управляет его значением.