В настоящее время у меня есть таблица в KSQL, созданная
CREATE TABLE cdc_window_table
WITH (KAFKA_TOPIC='cdc_stream',
VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
В этот момент он создал новую таблицу. Я могу просмотреть его по
SELECT *
FROM cdc_window_table
EMIT CHANGES;
который возвращает данные, такие как
+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART |WINDOWEND |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1 |1648767460000 |1648767480000 |1 |
|a1 |1648767460000 |1648767480000 |2 |
|a1 |1648767460000 |1648767480000 |3 |
|a1 |1648767480000 |1648767500000 |1 |
|a1 |1648767740000 |1648767760000 |1 |
Я пытаюсь создать выходной поток (журнал изменений) на основе этой таблицы, как это изображение:
(Источник изображения: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
Прочитав это, я попробовал эти 4 метода:
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
application_id_count INT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='TUMBLING',
WINDOW_SIZE='20 SECONDS');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
Когда я просматриваю
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
Он показывает только заголовок таблицы без каких-либо данных журнала изменений:
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
Каким будет правильный способ создания выходного потока (журнала изменений) на основе таблицы?
На шаге 2 вместо темы cdc_window_table
я должен использовать что-то вроде _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition
.
Тема журнала изменений этой таблицы автоматически создается KSQL при создании предыдущей таблицы.
Вы можете найти это длинное имя темы журнала изменений, используя
show all topics;
(Обратите внимание на all
выше. Без него тема журнала изменений не будет отображаться.)
Рабочий KSQL будет
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
VALUE_FORMAT='JSON');
(Обратите внимание на KEY
позади application_id STRING
выше. Без KEY
application_id
будет отображаться как null
в потоке.)
Когда я просматриваю
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
в этот момент я вижу
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
|a1 |null |
|a1 |null |
|a1 |null |
Я не уверен, почему application_id_count
— это null
, но для меня application_id
— это все, что меня волнует в моем случае использования. Если я найду решение или кто-нибудь знает, я обновлю этот ответ.