Можем ли мы выбрать определенную строку записей из слитной темы кафки?

На моей локальной платформе Confluent у меня есть 1 тема с вызовом «FOO_02», я вручную вставил в нее некоторые записи, поэтому я могу распечатать ее с начала с помощью следующей команды:

print 'FOO_02' from beginning;

Можем ли мы выбрать определенную строку записей из слитной темы кафки?

Можно сделать как-то так, я хочу вытащить только запись, где COL1=1? Что-то вроде того, что мы можем выполнить оператор select с условием where, чтобы получить данные из обычной базы данных, например db2.

Я попробовал следующую команду, но я считаю, что она получит только новые данные, потому что я получаю пустую запись для этой команды:

ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
25
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я предполагаю, что вы уже сделали

CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');

потому что иначе ваш SELECT потерпел бы неудачу.

Итак, поскольку PRINT успешно показывает, что в тема есть данные, вы можете запросить поток, используя нужный предикат. Единственное, что вам нужно сделать, это указать ksqlDB обрабатывать все данные в теме, а не только новые записи (что и делает from beginning в операторе PRINT). Для этого запустите:

SET 'auto.offset.reset' = 'earliest';

и потом запустить SELECT.


Редактировать

Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?

То, что вы описываете, это TABLE: последнее значение для данного ключа.

CREATE TABLE FOO AS
  SELECT COL1, 
         LATEST_BY_OFFSET(COL2) AS COL2
    FROM FOO_02
   WHERE COL1=0
   GROUP BY COL1;

В результирующей таблице будет одна запись для COL1 с последним значением COL2 по мере получения новых сообщений.

Any way to bring the old data in the Stream into the table as well?

Чтобы обработать существующие данные, установите смещение на самое раннее перед запуском оператора CREATE.

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE FOO AS
[…]

Можем ли мы выбрать только последние записи? Например, у меня есть несколько данных, которые отправляются в тему для COL1 = 0, но я хочу получить только самую последнюю, потому что это самые новые данные и они единственно правильные. что-то вроде где rowtime = max?

Panadol Chong 17.03.2022 13:31

я обновил свой ответ

Robin Moffatt 17.03.2022 14:46

привет @Robin, новое сообщение будет храниться только в новой созданной таблице? Есть ли способ перенести старые данные из Stream в таблицу?

Panadol Chong 18.03.2022 04:15

Я отредактировал свой ответ.

Robin Moffatt 18.03.2022 10:56

Другие вопросы по теме