Как следует из названия. В настоящее время я использую бэкэнд GetEventStore для своих мероприятий, и он отлично работает. Меня смущают подписки для поддержания моего магазина Read в актуальном состоянии.
В настоящее время я создаю догоняющую подписку, используя
EventStoreConnection.SubscribeToAllFrom(Position.Start, _subscriptionSettings, OnEventMaterialized, OnLiveProcessingStarted, OnSubscriptionDropped);
метод. Это хорошо работает, и у меня есть способ кэшировать все мои конкретные наблюдатели событий, и они хорошо строят мою модель чтения.
Однако что, если я хочу воспроизвести события определенного агрегата? В нынешнем виде мне нужно снова начать подписку с самого начала и позволить всему этому пройти. На сегодняшний день я сделал это так, чтобы сохранить текущую позицию потока каждого наблюдателя, сравнить ее с подпиской и, если больше, пропустить.
Не лучше ли, чтобы у каждого наблюдателя была своя подписка, чтобы очистить можно было только одного? Откуда вы знаете идентификаторы потоков в этом случае, вам нужно сохранять каждый отдельный идентификатор потока, который вы создаете, чтобы повторно подписаться на более позднюю дату?
Некоторые примеры кода или немного чтения были бы фантастическими. Я чувствую, что упускаю смысл с этой последней частью головоломки ES.





Чтобы ответить на ваш общий вопрос: да, каждый наблюдатель должен сохранять свою позицию в очереди.
How do you know the Stream IDs in that case, do you need to persist every single stream id you create in order to re-subscribe at some later date?
Имена ваших потоков должны быть воспроизводимыми. Я использую формат «события-{AggregateType}-{AggregateId}». Включаю проекции для ГЭС и так поток Также существует "$events-{AggregateType}". Я могу использовать это как поток для одного из моих наблюдателей, если это необходимо.
Хранение конфигураций наблюдателя в БД позволяет легко добавлять новые:
Name: All-Projections
StreamName: $events
Position: 1200 <-- This is updated by the observer as it reads
BufferSize: 100 <-- Number of records to read at a time
Name: Customers-Rebuild
StreamName: $events-CustomerAggregate
Position: 0 <-- This consumer has not run yet
BufferSize: 100
Затем вы можете запустить свой базовый исполняемый файл потребителя и передать имя наблюдателя в качестве параметра.
Привет @ДжеймсВудли. С GetEventStore, если вы запускаете прогнозы, он создаст для вас другие метапотоки. Например, используя проекции категорий, вы можете получить все события для всех клиентов. Это позволяет вам не заботиться о полном списке идентификаторов. eventstore.org/docs/projections/system-projections/…
@JamesWoodley Не беспокойтесь. Я боролся с теми же вопросами и рад поделиться своими выводами.
Спасибо за подтверждение моих мыслей - однако, если вы не сохраните каждый агрегат, вы не будете знать, к каким потокам подключаться, если вы «перестроили». Отсюда берется прогноз «Клиенты-восстановление»? Это дает вам все ваши идентификаторы?