У меня есть тема по одному брокеру с постоянно поступающими данными о кликах пользователей. Я хочу иметь возможность рассчитать среднее количество кликов за последние X минут на пользователя почти в реальном времени (например, за 1 секунду).
Я пробовал делать это с потоками Kafka, но проблема в том, что окно листания не может рассчитывать почти в реальном времени и обновлять среднее значение каждую секунду для всех значений ЗА ПОСЛЕДНИЕ X минут. Окно прыжков могло бы подойти, но для прыжка = 1 секунда и окна размером 5 минут было бы создано 300 окон, и я думаю, это слишком сильно смотрится со стороны производительности.
Есть ли способ сделать это без стороннего потокового движка, такого как Spark, но с «простыми» API Kafka? (Потоковая передача Kafka не является обязательной).
Большое спасибо!
Если вам важна производительность, почему у вас только один брокер? KSQL или Kafka Streams - это ответ, который вы ищете
@ MatthiasJ.Sax Хорошо, я это прекрасно знаю. Какие-нибудь рекомендации? Как управлять и сохранять состояние? Как получить доступ к значениям потока за последние X часов и т. д.?
@ cricket_007 Единый брокер - это не производственная установка, но это не имеет значения, я сказал это для простоты. Не могли бы вы подсказать, как это сделать с помощью KSQL или Kafka Streams?
Думаю, вам нужно прочитать документы и задать более конкретный вопрос в SO :)
Проблема в том, что я прочитал все, но ничего не нашел о своей ситуации.




Как уже говорили комментаторы - используйте Кафка Ручьи или KSQL. KSQL работает поверх Kafka Streams, поэтому моделирование данных и концепции, такие как управление окнами и агрегирование, одинаковы.
В KSQL:
ksql> CREATE TABLE USER_CLICKS_PER_MINUTE AS \
SELECT USER_ID, COUNT(*) AS CLICK_COUNT, \
COUNT(*)/5 AS CLICKS_PER_MINUTE \
FROM RATINGS WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 SECOND) \
GROUP BY USER_ID;
Message
---------------------------
Table created and running
---------------------------
Запросить агрегат с отслеживанием состояния:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), USER_ID, \
CLICK_COUNT, CLICKS_PER_MINUTE \
FROM USER_CLICKS_PER_MINUTE \
WHERE USER_ID=18;
2018-05-09 11:44:33 | 18 | 7 | 1
2018-05-09 11:44:34 | 18 | 7 | 1
2018-05-09 11:44:35 | 18 | 7 | 1
2018-05-09 11:44:36 | 18 | 9 | 1
2018-05-09 11:44:37 | 18 | 9 | 1
2018-05-09 11:44:38 | 18 | 10 | 2
2018-05-09 11:44:39 | 18 | 10 | 2
2018-05-09 11:44:40 | 18 | 10 | 2
2018-05-09 11:44:41 | 18 | 12 | 2
2018-05-09 11:44:42 | 18 | 12 | 2
2018-05-09 11:44:43 | 18 | 12 | 2
2018-05-09 11:44:44 | 18 | 12 | 2
2018-05-09 11:44:45 | 18 | 12 | 2
2018-05-09 11:44:46 | 18 | 12 | 2
2018-05-09 11:44:47 | 18 | 12 | 2
2018-05-09 11:44:48 | 18 | 12 | 2
2018-05-09 11:44:49 | 18 | 12 | 2
2018-05-09 11:44:50 | 18 | 12 | 2
2018-05-09 11:44:51 | 18 | 13 | 2
2018-05-09 11:44:52 | 18 | 13 | 2
2018-05-09 11:44:53 | 18 | 13 | 2
2018-05-09 11:44:54 | 18 | 13 | 2
2018-05-09 11:44:55 | 18 | 13 | 2
2018-05-09 11:44:56 | 18 | 13 | 2
2018-05-09 11:44:57 | 18 | 13 | 2
2018-05-09 11:44:58 | 18 | 13 | 2
2018-05-09 11:44:59 | 18 | 13 | 2
2018-05-09 11:45:00 | 18 | 13 | 2
2018-05-09 11:45:01 | 18 | 13 | 2
Имейте в виду, что Kafka Streams и KSQL (который построен на Kafka Streams) будут повторно генерировать агрегаты для заданного временного окна по мере поступления новых событий. В зависимости от ваших требований фактическое окно скачкообразного изменения, продвигающееся каждую 1 секунду, может быть не тем, что вам нужно. Простое всплывающее окно, которое обновляется в реальном времени, по-прежнему будет давать вам результаты агрегатов с отслеживанием состояния в реальном времени.
Больше информации о KSQL здесь:
Спасибо, Робин! Что ж, единственная актуальная информация, которая меня интересует, - это текущее или последнее окно, потому что оно фактически сообщает результаты агрегирования почти в реальном времени (с этого момента минус 5 минут). Окно большого пальца объединяет события, и после того, как окно закрывается, я получаю «окончательный» 5-минутный результат. Меня действительно интересует, чтобы результат обновлялся каждую секунду. Итак, ваше окно прыжка может быть тем, что мне нужно, но мне интересно, создает ли создание 300 окон проблемы с производительностью или это нормально?
Может, у меня тоже неправильно перепрыгивают окна? Таким образом, суть не в том, чтобы открыть окно и затем подождать, пока появятся события в течение 5 минут и в конце объединить их, а в том, чтобы смотреть с этого момента минус 5 минут и агрегировать их, и делать это каждую секунду. Окно большого пальца, когда я его получаю, открывает новое окно через 5 минут и запускает новую агрегацию, поэтому оно игнорирует события, которые произошли в 4:59, но в 5:01 я хочу агрегировать события с 0:01 до 5:01, поэтому я был думая о переходе окна, чтобы «избавиться» от событий старше 5 минут и делать это каждую секунду.
Вы неправильно понимаете окна. Пятиминутное вращающееся окно все равно будет оцениваться как по мере приближения событий, а агрегированные значения повторно генерируются Kafka Streams или KSQL. Таким образом, вы получаете представление о текущем состоянии ваших агрегатов в реальном времени - это всего лишь период, за который вы хотите их оценить. Но независимо от окна размер результаты выдаются в реальное время. Посмотрим, поможет ли это: docs.confluent.io/current/streams/developer-guide/…
Кувырок сбрасывается при переходе к следующему временному окну. Окно всегда начинается с 0.
Вы всегда можете создать собственное решение с помощью Kafka Streams Processor API.