На моей локальной платформе Confluent у меня есть 1 тема с вызовом «FOO_02», я вручную вставил в нее некоторые записи, поэтому я могу распечатать ее с начала с помощью следующей команды:
print 'FOO_02' from beginning;
Можно сделать как-то так, я хочу вытащить только запись, где COL1=1? Что-то вроде того, что мы можем выполнить оператор select с условием where, чтобы получить данные из обычной базы данных, например db2
.
Я попробовал следующую команду, но я считаю, что она получит только новые данные, потому что я получаю пустую запись для этой команды:
ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;
Я предполагаю, что вы уже сделали
CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');
потому что иначе ваш SELECT
потерпел бы неудачу.
Итак, поскольку PRINT
успешно показывает, что в тема есть данные, вы можете запросить поток, используя нужный предикат. Единственное, что вам нужно сделать, это указать ksqlDB обрабатывать все данные в теме, а не только новые записи (что и делает from beginning
в операторе PRINT
). Для этого запустите:
SET 'auto.offset.reset' = 'earliest';
и потом запустить SELECT
.
Редактировать
Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?
То, что вы описываете, это TABLE
: последнее значение для данного ключа.
CREATE TABLE FOO AS
SELECT COL1,
LATEST_BY_OFFSET(COL2) AS COL2
FROM FOO_02
WHERE COL1=0
GROUP BY COL1;
В результирующей таблице будет одна запись для COL1
с последним значением COL2
по мере получения новых сообщений.
Any way to bring the old data in the Stream into the table as well?
Чтобы обработать существующие данные, установите смещение на самое раннее перед запуском оператора CREATE
.
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE FOO AS
[…]
я обновил свой ответ
привет @Robin, новое сообщение будет храниться только в новой созданной таблице? Есть ли способ перенести старые данные из Stream в таблицу?
Я отредактировал свой ответ.
Можем ли мы выбрать только последние записи? Например, у меня есть несколько данных, которые отправляются в тему для COL1 = 0, но я хочу получить только самую последнюю, потому что это самые новые данные и они единственно правильные. что-то вроде где rowtime = max?