Как правильно создать поток вывода (changelog) на основе таблицы в KSQL?

Шаг 1: Создайте таблицу

В настоящее время у меня есть таблица в KSQL, созданная

CREATE TABLE cdc_window_table
    WITH (KAFKA_TOPIC='cdc_stream',
          VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
       COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;

В этот момент он создал новую таблицу. Я могу просмотреть его по

SELECT *
FROM cdc_window_table
EMIT CHANGES;

который возвращает данные, такие как

+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART    |WINDOWEND      |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1             |1648767460000  |1648767480000  |1                    |
|a1             |1648767460000  |1648767480000  |2                    |
|a1             |1648767460000  |1648767480000  |3                    |
|a1             |1648767480000  |1648767500000  |1                    |
|a1             |1648767740000  |1648767760000  |1                    |

Шаг 2: Создать выходной поток (журнал изменений) — НЕУДАЧНО

Я пытаюсь создать выходной поток (журнал изменений) на основе этой таблицы, как это изображение:

(Источник изображения: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)

Как правильно создать поток вывода (changelog) на основе таблицы в KSQL?

Прочитав это, я попробовал эти 4 метода:

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
                                                 application_id_count INT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='TUMBLING',
         WINDOW_SIZE='20 SECONDS');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

Когда я просматриваю

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

Он показывает только заголовок таблицы без каких-либо данных журнала изменений:

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+

Каким будет правильный способ создания выходного потока (журнала изменений) на основе таблицы?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
45
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

На шаге 2 вместо темы cdc_window_table я должен использовать что-то вроде _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition.

Тема журнала изменений этой таблицы автоматически создается KSQL при создании предыдущей таблицы.

Вы можете найти это длинное имя темы журнала изменений, используя

show all topics;

(Обратите внимание на all выше. Без него тема журнала изменений не будет отображаться.)

Рабочий KSQL будет

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT)
  WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
        VALUE_FORMAT='JSON');

(Обратите внимание на KEY позади application_id STRING выше. Без KEYapplication_id будет отображаться как null в потоке.)

Когда я просматриваю

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

в этот момент я вижу

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+
|a1                |null                   |
|a1                |null                   |
|a1                |null                   |

Я не уверен, почему application_id_count — это null, но для меня application_id — это все, что меня волнует в моем случае использования. Если я найду решение или кто-нибудь знает, я обновлю этот ответ.

Другие вопросы по теме