Как я могу использовать Mutiny для периодического опроса новых событий из ресурса данных, в то же время задерживая выпуск каждого события на динамическое количество времени, чтобы свойство даты события было ровно одну секунду назад?
Каждый элемент в ресурсе данных содержит свойство даты, которое представляет время входа, и цель состоит в том, чтобы генерировать каждое событие через одну секунду после его времени входа. Я рассматриваю возможность использования Uni<List> (запросы к базе данных являются "однократными" операциями), но я бы предпочел, чтобы Multi объединял потоки в родительском классе.
private LocalDateTime timeOfLastQuery;
public Multi<Event> getNewEvents() {
final var eventStream = Multi.createFrom().iterable(getNewEvents(timeOfLastQuery))
//todo delay item by millisToOneSecondOld
.onItem().transform(mapper::toEvent);
timeOfLastQuery = LocalDateTime.now();
return eventStream;
}
private Long millisToOneSecondOld(LocalDateTime timeOfEntry) {
final var aSecondAgo = LocalDateTime.now().minusSeconds(1);
final var duration = Duration.between(aSecondAgo, timeOfEntry);
return coerceAtLeast(duration.toMillis(),0);
}
public static long coerceAtLeast(long x, long minimum) {
return Math.max(x, minimum);
}
Вы можете реализовать свою задержку следующим образом:
.call(ignored -> Uni.createFrom().nullItem().onItem().delayIt().by(getDuration()))
Когда вы получаете элемент, он подписывается на созданный мной Uni и ждет, пока элемент будет выпущен, прежде чем распространять исходный элемент вниз по течению. Хитрость заключается в том, чтобы отложить выпуск предмета null
.
Я решил это с помощью .onItem().transformToUniAndMerge(this::buildDelayedEventUni), где buildDelayedEventUni устанавливает задержку на основе свойств события.
У вас могут быть разные задержки, если getDuration()
каждый раз возвращает другое значение. Он оценивается по каждому пункту.
Я вижу, что могу передать событие -> getDuration(event.time) для расчета задержки. Я все еще думаю, что решение с transformToUniAndMerge более правильно для моего варианта использования, потому что оно не заботится о порядке элементов, а call. Если нет снижения производительности при использовании преобразования + слияния по вызову?
Разве таким образом все элементы не будут иметь одинаковое значение задержки? Я хочу, чтобы значение было динамическим на основе свойства события. В этом конкретном случае я хочу отложить события, чтобы они были как минимум на 1 секунду старше с момента их записи в БД.