Потребитель концентратора событий в Service Fabric

Я пытаюсь заставить сервисную структуру последовательно извлекать сообщения из концентратора событий Azure. Кажется, у меня все подключено, но я заметил, что мой потребитель просто перестает получать события.

У меня есть хаб с парой тысяч событий, которые я туда отправил. Концентратор сконфигурирован с 1 разделом, а моя служба структуры служб также имеет только 1 раздел для облегчения отладки.

Служба запускается, создает EventHubClient, оттуда использует его для создания PartitionReceiver. Получатель передается в «EventLoop», который входит в «бесконечность», когда вызывает приемник.ReceiveAsync. Код для EventLoop ниже.

Что я наблюдаю, так это то, что в первый раз в цикле я почти всегда получаю 1 сообщение. Во второй раз я получаю где-то между 103 и 200 сообщениями. После этого мне не приходят сообщения. Также кажется, что если я перезапускаю службу, я снова получаю те же сообщения, но это потому, что когда я перезапускаю службу, она запускается с самого начала потока.

Ожидается, что это будет продолжаться до тех пор, пока мои 2000 сообщений не будут израсходованы, а затем он будет ждать меня (иногда опрашивая).

Есть ли что-то конкретное, что мне нужно сделать с пакетом Azure.Messaging.EventHubs 5.3.0, чтобы он продолжал получать события?

//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
   EntityPath = "NameOfMyEventHub"
};
try
{
   m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}

//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());

//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
  {
     m_started = true;
     while (m_keepRunning)
     {
        var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
        if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
        {
           var eventsArray = events as EventData[] ?? events.ToArray();
           m_state.NumProcessedSinceLastSave += eventsArray.Count();

           foreach (var evt in eventsArray)
           {
              //Process the event
              await m_options.Processor.ProcessMessageAsync(evt, null);

              string lastOffset = evt.SystemProperties.Offset;

              if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
              {
                 m_state.Offset = lastOffset;
                 m_state.NumProcessedSinceLastSave = 0;
                 await m_state.SaveAsync();
              }
           }
        }
     }

     m_started = false;
  }

** РЕДАКТИРОВАТЬ, был задан вопрос о количестве разделов. Концентратор событий имеет один раздел, и служба SF также имеет один раздел.

Я намереваюсь использовать состояние сервисной фабрики, чтобы отслеживать мое смещение в концентраторе, но сейчас это не проблема.

Слушатели разделов создаются для каждого раздела. Я получаю такие разделы:

public async Task StartAsync()
  {
     // slice the pie according to distribution
     // this partition can get one or more assigned Event Hub Partition ids
     string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
     string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);

     foreach (var resolvedPartition in resolvedEventHubPartitionIds)
     {
        var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
        await partitionReceiver.StartAsync();
        m_partitionReceivers.Add(partitionReceiver);
     }
  }

Когда вызывается partitionListener.StartAsync, он фактически создает PartitionListener, как это (на самом деле это немного больше, чем это, но взятая ветвь такова:

m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());

Спасибо за любые советы. Воля

Можете ли вы помочь мне понять, как управляется m_keepRunning? Я вижу, что он используется как часть управления циклом, но было бы полезно посмотреть, как приложение управляет его значением.

Jesse Squire 19.12.2020 19:01

Для CloseAsync устанавливается значение false, когда сервисная структура завершает работу службы.

Will Comeaux 21.12.2020 01:30

Я не вижу ничего очевидного в коде, которым поделились. Интересно, есть ли ненаблюдаемое исключение, которое вызывает сбой задачи. Цикл не имеет обработки ошибок в версии, которой поделились. Какова стратегия обработки исключений в ProcessMessageAsync?

Jesse Squire 21.12.2020 16:08

>> Есть ли что-то конкретное, что мне нужно сделать с пакетом Azure.Messaging.EventHubs 5.3.0, чтобы он продолжал получать события? Я не уверен, что вы там спрашиваете. Фрагменты, которыми вы поделились, используют устаревший пакет Microsoft.Azure.EventHubs, а не текущее поколение Azure.Messaging.EventHubs.

Jesse Squire 21.12.2020 16:08

@JesseSquire, это интересное наблюдение. Упомянутый пакет — Azure.Messaging.EventHubs (5.3.0-beta.4), но вы действительно правы, когда я перехожу к определению EventHubClient, он находится в // C:\Users\User\.nuget\packages\ microsoft.azure.eventhubs\4.3.‌​1\lib\netstandard2.0‌​\Microsoft.Azure.Eve‌​ntHubs.dll, хотя этот пакет даже не упоминается в моем проекте. Несомненно, когда-то это было. Позвольте мне разгадать это, и, возможно, проблема разрешится сама собой.

Will Comeaux 21.12.2020 16:58

Можете ли вы использовать Service Bus Explorer и сравнить результаты? Интересно, у некоторых сообщений уже истек срок хранения и они не доставлены из-за этого.

Serkant Karaca 21.12.2020 17:48
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
6
438
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Сколько разделов у вас есть? Я не вижу в вашем коде, как вы убедитесь, что читаете все разделы в группе потребителей по умолчанию.

Есть какая-то конкретная причина, по которой вы используете PartitionReceiver вместо EventProcessorHost?

Мне кажется, что SF идеально подходит для использования хоста обработчика событий. Я вижу, что уже есть интегрированное решение SF, которое использует сервисы с отслеживанием состояния для контрольных точек.

Чтобы ответить на вопрос о разделе, концентратор событий имеет 1 раздел, а служба SF также имеет только один - я думал, что это также может быть виновником, поэтому я исключил это. Я добавлю некоторые из этих деталей в свой основной пост.

Will Comeaux 21.12.2020 15:48

Упомянутый пакет интеграции &linked SF использует службу с отслеживанием состояния вместо учетной записи хранения.

Peter Bons 21.12.2020 15:53

Просто отметим, что по приведенной выше ссылке рекомендуется использовать библиотеку текущего поколения, Azure.Messaging.EventHubs. Ссылка EventProcessorHost в ответе указывает на правильную библиотеку, но имя типа в этом пакете будет EventProcessorClient. В этом сценарии наследование от EventProcesor<T> будет лучшим подходом, позволяющим использовать состояние Service Fabric в качестве хранилища контрольных точек.

Jesse Squire 21.12.2020 16:17

Спасибо всем, я изучаю эти рекомендации. Должно разобраться.

Will Comeaux 21.12.2020 17:17

@JesseSquire Я вижу, вы участвуете в проекте Azure.Messaging.EventHubs. В документе говорится: «Для производственного использования мы рекомендуем использовать клиент обработчика событий». Это ссылка на проект для Azure.Messaging.EventHubs.Processor. Есть ли у кого-нибудь пример использования EventProcessor<T> в Service Fabric? Дополнительное примечание: почему существует так много библиотек для этого материала и как общественность должна знать, какую использовать, не прибегая к переполнению стека?

Will Comeaux 21.12.2020 22:42

К сожалению, у нас пока нет хорошего образца для настройки EventProcessor<T>... лучшим примером на данный момент является EventProcessorClient, который является самоуверенной настройкой поверх него.

Jesse Squire 21.12.2020 22:50

Мы рекомендуем использовать семейство пакетов Azure.[[ AREA ]].[[ SERVICE ]] для новой разработки или нетривиальных обновлений. Что касается мотивов и различий, то по мере того, как Azure росла и была принята более разнообразной группой разработчиков, мы сосредоточились на изучении шаблонов и методов, чтобы наилучшим образом повысить производительность разработчиков и понять пробелы, которые есть в клиентских библиотеках .NET. .

Jesse Squire 21.12.2020 22:52

В экосистеме клиентской библиотеки Azure было несколько областей единообразных отзывов. Одним из наиболее важных является то, что клиентские библиотеки для различных служб Azure не имеют единого подхода к организации, именованию и структуре API. Кроме того, многие разработчики считают, что кривая обучения была сложной, а API-интерфейсы не предлагали хорошей, доступной и последовательной истории адаптации для тех, кто изучает Azure или изучает конкретную службу Azure.

Jesse Squire 21.12.2020 22:52

Чтобы попытаться улучшить процесс разработки в службах Azure, был создан набор универсальных руководств по проектированию для всех языков, чтобы обеспечить согласованное взаимодействие с установленными шаблонами API для всех служб. Также был представлен набор специфических для .NET руководящих принципов, чтобы гарантировать, что клиенты .NET имеют естественное и идиоматическое ощущение, отражающее поведение библиотек базовых классов .NET. Дополнительный контекст, обоснование и подробности доступны в связанных руководствах.

Jesse Squire 21.12.2020 22:52

Всем спасибо. Выполнив это упражнение по обеспечению использования правильных пакетов, я обнаружил проблему. Оказывается, это было в моем глупом модульном тесте, которого не ждали. Тест заканчивался и убивал задачу до того, как все сообщения были на концентраторе, и он постоянно заканчивался после той же самой суммы просто случайно ... Рад быть на новых пакетах, поэтому ваша помощь улучшила ситуацию для меня в целом.

Will Comeaux 23.12.2020 15:38

Другие вопросы по теме

Похожие вопросы

Как передать мой код с локального компьютера в Azure devops в другой ветке, отличной от основной, с помощью GIT?
Команда AzCopy Sync не работает с ключом доступа, но передает ключ SAS, когда контейнер является частным
Crontab с командой flask в службе приложений Azure с Linux ASP
Внедрить переменные окружения ADO в сценарий внутри того же конвейера сборки ADO
Ошибка синхронизации azcopy с сообщением «409 Общий доступ не разрешен для этой учетной записи хранения». на личном контейнере
Как заменить NULL пробелом в фабрике данных Azure для всех типов данных?
Как использовать теги концентратора уведомлений Azure для push-сообщений в приложениях UWP
Azure DevOps получает доступ к двум хранилищам ключей с повторяющимися секретными именами
Не удается аннотировать контроллер входящего трафика Kong с помощью частного балансировщика нагрузки AKS
Не удается импортировать БД с помощью портала Azure