В основном я пытаюсь получить все столбцы, когда использую выражение group by в своем запросе.
Создание потока из темы
CREATE STREAM events_stream \
( \
account VARCHAR, \
event_id VARCHAR, \
user_name VARCHAR, \
event_name VARCHAR, \
source VARCHAR, \
message VARCHAR, \
timestamp STRUCT<iMillis INTEGER>) \
WITH (KAFKA_TOPIC='console_failure', VALUE_FORMAT='JSON');
создание таблицы из вышеприведенного потока.
ksql> CREATE TABLE events_table AS \
SELECT source, count(*) \
FROM events_stream \
WINDOW TUMBLING (SIZE 60 SECONDS) \
WHERE account = '1111111111' \
GROUP BY source \
HAVING count(*) > 3;
Вывод этого сообщения 4 раза.
ip = "10.10.10.10"
data = {
"account": "1111111111",
"event_id": "4cdabe46-690d-494a-a37e-6e455781d8b4",
"user_name": "shakeel",
"event_name": "some_event",
"source": "127.0.0.1",
"message": "message related to event",
"timestamp": {
"iMillis": 1547543309000
}
}
producer.send('console_failure', key='event_json', value=dict(data)
Это работает, как и ожидалось! Но как получить другие поля (например: user_name, message и т. д.) для совпадающего результата?
ksql> select * from events_table;
1550495772262 | 10.10.10.10 : Window{start=1550495760000 end=-} | 10.10.10.10 | 4
ksql>
После использования я понимаю, что, возможно, мы не можем получить другие столбцы при использовании оператора group by.
ksql> CREATE TABLE events_table1 AS \
> SELECT source, event_id, \
> count(*) \
> FROM events_stream \
> WINDOW TUMBLING (SIZE 60 SECONDS) \
> WHERE account = '1111111111' \
> GROUP BY source \
> HAVING count(*) > 3;
Group by elements should match the SELECT expressions.
ksql>
Можем ли мы добиться этого с помощью смены ключа потока?
После прочтения это я попытался изменить ключ своего потока с полем event_id, но не уверен, как я могу использовать ключ раздела в своем выражении group by.
Ниже приведена ошибка, которую я получаю, когда пытаюсь использовать rekey.
ksql> CREATE STREAM events_stream_rekey AS SELECT * FROM events_stream PARTITION BY event_id;
Message
----------------------------
Stream created and running
----------------------------
ksql>
ksql> SELECT ROWKEY, EVENT_ID FROM events_stream_rekey;
4cdabe46-690d-494a-a37e-6e455781d8b4 | 4cdabe46-690d-494a-a37e-6e455781d8b4
ksql>
ksql> CREATE TABLE events_table2 AS \
> SELECT source, \
> count(*), \
> WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
> WINDOW TUMBLING (SIZE 60 SECONDS) \
> WHERE account = '1111111111' \
> GROUP BY source \
> HAVING count(*) > 3;
line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Сведения о версии KSQL: CLI v5.1.0, Server v5.1.0
-------------------------- ДЕЙСТВИЯ ПО ВОСПРОИЗВЕДЕНИЮ --------------------- -----
Сценарий продюсера: Этот сценарий будет генерировать 4 сообщения менее чем за 30 секунд окна.
import time
import uuid
from kafka import KafkaProducer
from json import dumps
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(1, 5):
time.sleep(1)
data = {
"account": "1111111111",
"event_id": str(uuid.uuid4()),
"user_name": "user_{0}".format(i),
"event_name": "event_{0}".format(i),
"source": "10.0.9.1",
"message": "message related to event {0}".format(i),
"timestamp": {
"iMillis": int(round(time.time() * 1000))
}
}
time.sleep(2)
producer.send('testing_topic', value=data)
На потребление сообщений из testing_topic (используя обычный потребительский скрипт).
{'account': '1111111111', 'event_id': 'c186ba8a-2402-428a-a5d8-c5b8279e14af', 'user_name': 'user_1', 'event_name': 'event_1', 'source': '10.0.9.1', 'message': 'message related to event 1', 'timestamp': {'iMillis': 1551296878444}}
{'account': '1111111111', 'event_id': '4c45bff7-eb40-48a8-9972-301ad24af9ca', 'user_name': 'user_2', 'event_name': 'event_2', 'source': '10.0.9.1', 'message': 'message related to event 2', 'timestamp': {'iMillis': 1551296881465}}
{'account': '1111111111', 'event_id': '4ee14303-e6d1-4847-ae3d-22b49b3ce6eb', 'user_name': 'user_3', 'event_name': 'event_3', 'source': '10.0.9.1', 'message': 'message related to event 3', 'timestamp': {'iMillis': 1551296884469}}
{'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}
Ожидаемый результат: Если сообщения содержат один и тот же адрес source в течение 30 секунд времени окна для одного и того же account, тогда я хочу получить следующее немедленное завершение.
сообщения (4-е сообщение в моем случае, как показано ниже). Можно ли этого добиться с помощью KSQL?
{'account': '1111111111', 'event_id': '3c196ac5-9559-4269-bf51-95b78ce4ffcb', 'user_name': 'user_4', 'event_name': 'event_4', 'source': '10.0.9.1', 'message': 'message related to event 4', 'timestamp': {'iMillis': 1551296887472}}

Само сообщение на самом деле говорит вам о проблеме :)
Group by elements should match the SELECT expressions.
Итак, у вас есть source в обаSELECT и GROUP BY:
ksql> SELECT source, count(*) \
> FROM events_stream \
> WINDOW TUMBLING (SIZE 60 SECONDS) \
> WHERE account = '1111111111' \
> GROUP BY source \
> HAVING count(*) > 3;
127.0.0.1 | 4
^CQuery terminated
Чтобы добавить другие столбцы, убедитесь, что вы также добавили их в SELECT:
ksql> SELECT source, event_id, count(*) \
> FROM events_stream \
> WINDOW TUMBLING (SIZE 60 SECONDS) \
> WHERE account = '1111111111' \
> GROUP BY source, event_id \
> HAVING count(*) > 3;
127.0.0.1 | 4cdabe46-690d-494a-a37e-6e455781d8b4 | 4
Изменить, чтобы ответить на ваш обновленный вопрос:
Я не думаю, что это можно [легко] сделать на SQL (или KSQL). Возможно, вы сможете добиться чего-то подобного, включив метку времени в агрегатную операцию, например:
CREATE TABLE source_alert AS \
SELECT source, COUNT(*), MAX(timestamp) \
FROM event_stream WINDOW TUMBLING (SIZE 60 SECONDS) \
GROUP BY `source` \
HAVING COUNT(*)>1
а затем возьмите полученную таблицу и присоединитесь к потоку событий:
SELECT * \
FROM event_stream e \
INNER JOIN \
source_alert a ON e.source=a.source \
WHERE e.timestamp=a.timestamp
Я не пробовал это, но в принципе это может привести вас туда, куда вы хотите.
Если вы не включаете event_id в свой GROUP BY, то не имеет логического смысла использовать его в качестве ключа сообщения. Вы не можете агрегировать, но по-прежнему включать более низкую гранулярность данных. Возможно, вам нужно обновить свой вопрос, чтобы более четко проиллюстрировать, что вы пытаетесь сделать.
Спасибо Робин за ответ, я просто ищу такой запрос "выберите * из переворачивания окна таблицы (размер 30 секунд), где учетная запись = '1111111111' группа по источнику, имеющая количество (*) > 3;". Дайте мне знать, если это все еще неясно, я буду рад предоставить вам более подробную информацию об этом.
Да, обновите свой вопрос, указав пример ваших входных данных и ожидаемый результат, который вы хотите получить от своего запроса.
Благодарю вас за быстрый ответ, я обновил свой вопрос, указав несколько шагов для воспроизведения и ожидаемого результата. Пожалуйста, помогите мне, если это может быть достигнуто с помощью KSQL?
В дополнение к ответу Робина эта ошибка:
line 1:70: extraneous input 'WITH' expecting {'(', 'NOT', 'NO', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'REPLACE', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'CAST', 'SHOW', 'TABLES', 'SCHEMAS', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'IF', '+', '-', '*', STRING, BINARY_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Имеется в виду тот факт, что ваше предложение WITH находится не в том месте. Правильный узор такой:
CREATE TABLE <table name> WITH(...) AS SELECT ...
Что сделало бы ваше заявление:
ksql> CREATE TABLE events_table2
> WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'),
> AS
> SELECT source, count(*),
> WINDOW TUMBLING (SIZE 60 SECONDS)
> WHERE account = '1111111111'
> GROUP BY source
> HAVING count(*) > 3;
Да, я пробовал это, но я получаю сообщение об ошибке, показанное ниже. Приведенный выше запрос работает для вас? пожалуйста, дайте мне знать, если для выполнения этого запроса необходимы какие-либо синтаксические изменения. `ksql> CREATE TABLE events_table3 > WITH (KAFKA_TOPIC='EVENTS_STREAM_REKEY', VALUE_FORMAT='JSON', KEY='event_id'), > AS > SELECT source, count(), > WINDOW TUMBLING (РАЗМЕР 60 СЕКУНД) > WHERE account = '1111111111' > GROUP BY source > HAVING count() > 3; строка 2:86: несоответствующий ввод ',' ожидается ';' Вызвано: org.antlr.v4.runtime.InputMismatchException ksql> `
> Вы пишете Робин, но в моем случае
event_idбудет отличаться для каждого сообщения, поэтому я бы не стал группировать по event_id. Мне нужен 'event_id' в качестве ключа, чтобы я мог использовать его для присоединения к другой таблице. > Можете ли вы поделиться каким-либо другим лучшим способом получить все столбцы 4-го сообщения, которое будет результатом вышеуказанного запросаgroup by.