Я искал пример использования Kafka Streams о том, как это сделать, т.е. присоединиться к таблице клиентов с таблицей адресов и передать данные в ES: -
+------+------------+----------------+-----------------------+
| id | first_name | last_name | email |
+------+------------+----------------+-----------------------+
| 1001 | Sally | Thomas | [email protected] |
| 1002 | George | Bailey | [email protected] |
| 1003 | Edward | Davidson | [email protected] |
| 1004 | Anne | Kim | [email protected] |
+------+------------+----------------+-----------------------+
+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street | city | state | zip | type |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 | 1001 | 3183 Moore Avenue | Euless | Texas | 76036 | SHIPPING |
| 11 | 1001 | 2389 Hidden Valley Road | Harrisburg | Pennsylvania | 17116 | BILLING |
| 12 | 1002 | 281 Riverside Drive | Augusta | Georgia | 30901 | BILLING |
| 13 | 1003 | 3787 Brownton Road | Columbus | Mississippi | 39701 | SHIPPING |
| 14 | 1003 | 2458 Lost Creek Road | Bethlehem | Pennsylvania | 18018 | SHIPPING |
| 15 | 1003 | 4800 Simpson Square | Hillsdale | Oklahoma | 73743 | BILLING |
| 16 | 1004 | 1289 University Hill Road | Canehill | Arkansas | 72717 | LIVING |
+----+-------------+---------------------------+------------+--------------+-------+----------+
"hits": [
{
"_index": "customers_with_addresses",
"_type": "_doc",
"_id": "1",
"_score": 1.3278645,
"_source": {
"first_name": "Sally",
"last_name": "Thomas",
"email": "[email protected]",
"addresses": [{
"street": "3183 Moore Avenue",
"city": "Euless",
"state": "Texas",
"zip": "76036",
"type": "SHIPPING"
}, {
"street": "2389 Hidden Valley Road",
"city": "Harrisburg",
"state": "Pennsylvania",
"zip": "17116",
"type": "BILLING"
}],
}
}, ….
Табличные данные поступают из тем Debezium, правильно ли я думаю, что мне нужна какая-то Java посередине, чтобы присоединиться к потокам, вывести их в новую тему, которая затем погружает их в ES?
У кого-нибудь есть пример кода этого?
Спасибо.
Привет, Робин. Меня бы тоже очень заинтересовал KSQL, я просто подумал, что мне нужно это сделать с помощью Java! :)
Да, вы можете реализовать решение с помощью API потоков Kafka в java следующим образом.
Ниже приведен пример (учитывая, что данные используются в формате json):
KStream<String,JsonNode> customers = builder.stream("customer", Consumed.with(stringSerde, jsonNodeSerde));
KStream<String,JsonNode> addresses = builder.stream("address", Consumed.with(stringSerde, jsonNodeSerde));
// Select the customer ID as key in order to join with address.
KStream<String,JsonNode> customerRekeyed = customers.selectKey(value-> value.get("id").asText());
ObjectMapper mapper = new ObjectMapper();
// Select Customer_id as key to aggregate the addresses and join with customer
KTable<String,JsonNode> addressTable = addresses
.selectKey(value-> value.get("customer_id").asText())
.groupByKey()
.aggregate(() ->mapper::createObjectNode, //initializer
(key,value,aggregate) -> aggregate.add(value),
Materialized.with(stringSerde, jsonNodeSerde)
); //adder
// Join Customer Stream with Address Table
KStream<String,JsonNode> customerAddressStream = customerRekeyed.leftJoin(addressTable,
(left,right) -> {
ObjectNode finalNode = mapper.createObjectNode();
ArrayList addressList = new ArrayList<JsonNode>();
// Considering the address is arrayNode
((ArrayNode)right).elements().forEachRemaining(addressList ::add);
left.putArray("addresses").allAll(addressList);
return left;
},Joined.keySerde(stringSerde).withValueSerde(jsonNodeSerde));
Подробную информацию обо всех типах соединений можно найти здесь:
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#joining
Спасибо Нишу! Это проясняет ситуацию для меня... тогда, по-видимому, я просто перенаправлю customerAddressStream в другую тему для погружения в ES через соединение Kafka?
Да, именно. :)'
В зависимости от того, насколько строги ваши требования к вложению нескольких адресов в один клиентский узел, вы можете сделать это в KSQL (который построен на основе Kafka Streams).
Заполните некоторые тестовые данные в Kafka (что в вашем случае делается уже через Debezium):
$ curl -s "https://api.mockaroo.com/api/ffa9ff20?count=10&key=ff7856d0" | kafkacat -b localhost:9092 -t addresses -P
$ curl -s "https://api.mockaroo.com/api/9b868890?count=4&key=ff7856d0" | kafkacat -b localhost:9092 -t customers -P
Запустите KSQL и для начала просто проверьте данные:
ksql> PRINT 'addresses' FROM BEGINNING ;
Format:JSON
{"ROWTIME":1558519823351,"ROWKEY":"null","id":1,"customer_id":1004,"street":"8 Moulton Center","city":"Bronx","state":"New York","zip":"10474","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":2,"customer_id":1001,"street":"5 Hollow Ridge Alley","city":"Washington","state":"District of Columbia","zip":"20016","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":3,"customer_id":1000,"street":"58 Maryland Point","city":"Greensboro","state":"North Carolina","zip":"27404","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":4,"customer_id":1002,"street":"55795 Derek Avenue","city":"Temple","state":"Texas","zip":"76505","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":5,"customer_id":1002,"street":"164 Continental Plaza","city":"Modesto","state":"California","zip":"95354","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":6,"customer_id":1004,"street":"6 Miller Road","city":"Louisville","state":"Kentucky","zip":"40205","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":7,"customer_id":1003,"street":"97 Shasta Place","city":"Pittsburgh","state":"Pennsylvania","zip":"15286","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":8,"customer_id":1000,"street":"36 Warbler Circle","city":"Memphis","state":"Tennessee","zip":"38109","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":9,"customer_id":1001,"street":"890 Eagan Circle","city":"Saint Paul","state":"Minnesota","zip":"55103","type":"SHIPPING"}
{"ROWTIME":1558519823354,"ROWKEY":"null","id":10,"customer_id":1000,"street":"8 Judy Terrace","city":"Washington","state":"District of Columbia","zip":"20456","type":"SHIPPING"}
^C
Topic printing ceased
ksql>
ksql> PRINT 'customers' FROM BEGINNING;
Format:JSON
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1001,"first_name":"Jolee","last_name":"Handasyde","email":"[email protected]"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1002,"first_name":"Rebeca","last_name":"Kerrod","email":"[email protected]"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1003,"first_name":"Bobette","last_name":"Brumble","email":"[email protected]"}
{"ROWTIME":1558519852368,"ROWKEY":"null","id":1004,"first_name":"Royal","last_name":"De Biaggi","email":"[email protected]"}
Теперь мы объявляем STREAM
(тема Kafka + схема) для данных, чтобы мы могли манипулировать ими дальше:
ksql> CREATE STREAM addresses_RAW (ID INT, CUSTOMER_ID INT, STREET VARCHAR, CITY VARCHAR, STATE VARCHAR, ZIP VARCHAR, TYPE VARCHAR) WITH (KAFKA_TOPIC='addresses', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM customers_RAW (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
Мы собираемся смоделировать customers
как TABLE
, и для этого сообщения Kafka должны иметь правильные ключи (и момент, когда они имеют нулевые ключи, как видно из "ROWKEY":"null"
в выводе PRINT
выше). Вы можете настроить Debezium для установки ключа сообщения, поэтому этот шаг может не понадобиться вам в KSQL:
ksql> CREATE STREAM CUSTOMERS_KEYED WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_RAW PARTITION BY ID;
Message
----------------------------
Stream created and running
----------------------------
Теперь мы объявляем TABLE
(состояние для данного ключа, созданного из темы + схемы Kafka):
ksql> CREATE TABLE CUSTOMER (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='CUSTOMERS_KEYED', VALUE_FORMAT='JSON', KEY='ID');
Message
---------------
Table created
---------------
Теперь мы можем объединить данные:
ksql> CREATE STREAM customers_with_addresses AS
SELECT CUSTOMER_ID,
FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME,
FIRST_NAME,
LAST_NAME,
TYPE AS ADDRESS_TYPE,
STREET,
CITY,
STATE,
ZIP
FROM ADDRESSES_RAW A
INNER JOIN CUSTOMER C
ON A.CUSTOMER_ID = C.ID;
Message
----------------------------
Stream created and running
----------------------------
Это создает новый KSQL STREAM, который, в свою очередь, заполняет новую тему Kafka.
ksql> SHOW STREAMS;
Stream Name | Kafka Topic | Format
------------------------------------------------------------------------------------------
CUSTOMERS_KEYED | CUSTOMERS_KEYED | JSON
ADDRESSES_RAW | addresses | JSON
CUSTOMERS_RAW | customers | JSON
CUSTOMERS_WITH_ADDRESSES | CUSTOMERS_WITH_ADDRESSES | JSON
Поток имеет схему:
ksql> DESCRIBE CUSTOMERS_WITH_ADDRESSES;
Name : CUSTOMERS_WITH_ADDRESSES
Field | Type
------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CUSTOMER_ID | INTEGER (key)
FULL_NAME | VARCHAR(STRING)
FIRST_NAME | VARCHAR(STRING)
ADDRESS_TYPE | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
STREET | VARCHAR(STRING)
CITY | VARCHAR(STRING)
STATE | VARCHAR(STRING)
ZIP | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Мы можем запросить поток:
ksql> SELECT * FROM CUSTOMERS_WITH_ADDRESSES WHERE CUSTOMER_ID=1002;
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | LIVING | Kerrod | 55795 Derek Avenue | Temple | Texas | 76505
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | SHIPPING | Kerrod | 164 Continental Plaza | Modesto | California | 95354
Мы также можем передать его в Elasticsearch с помощью Kafka Connect:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "sink-elastic-customers_with_addresses-00",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "CUSTOMERS_WITH_ADDRESSES",
"connection.url": "http://elasticsearch:9200",
"type.name": "type.name=kafkaconnect",
"key.ignore": "true",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
Результат:
$ curl -s http://localhost:9200/customers_with_addresses/_search | jq '.hits.hits[0]'
{
"_index": "customers_with_addresses",
"_type": "type.name=kafkaconnect",
"_id": "CUSTOMERS_WITH_ADDRESSES+0+2",
"_score": 1,
"_source": {
"ZIP": "76505",
"CITY": "Temple",
"ADDRESS_TYPE": "LIVING",
"CUSTOMER_ID": 1002,
"FULL_NAME": "Rebeca Kerrod",
"STATE": "Texas",
"STREET": "55795 Derek Avenue",
"LAST_NAME": "Kerrod",
"FIRST_NAME": "Rebeca"
}
}
Что касается «Вы можете настроить Debezium для установки ключа сообщения»: на самом деле это всегда так. Ключ сообщения будет представлять столбец (столбцы) первичного ключа захваченной таблицы; или, если его нет, в зависимости от соединителя, первый уникальный ключ таблицы.
@Robin, как бы это выглядело с KSQL, если бы мы хотели создать один документ, содержащий одного клиента и все его адреса (т. е. встроенный массив). Я думаю, что это предпочтительное представление при отправке таких данных в ES. Это то, что мы пробовали с сообщением в блоге, на которое я ссылался, и здесь все становится сложнее.
@Gunnar да, это невозможно в KSQL… пока :) См. github.com/confluentinc/ksql/issues/2147
Я правильно понял, чтобы объединить два потока, мы должны объединить потоки по общему ключу? Какой идентификатор одного потока является задержкой?
Недавно мы создали демонстрацию и Сообщение блога именно для этого варианта использования (потоковая передача агрегатов в Elasticsearch) в блоге Debezium.
Одна проблема, о которой следует помнить, заключается в том, что это решение (основанное на Kafka Streams, но я считаю, что то же самое для KSQL) склонно к раскрытию результатов промежуточного соединения. Например. Предположим, вы вставляете клиента и 10 адресов в одну транзакцию. Подход с потоковым соединением может сначала создать совокупность клиентов и их первых пяти адресов, а вскоре после этого — полную совокупность со всеми 10 адресами. Это может быть или не быть желательным для вашего конкретного варианта использования. Я также помню, что обработка удалений не тривиальна (например, если вы удалите один из 10 адресов, вам придется снова создать совокупность с оставшимися 9 адресами, которые, возможно, остались нетронутыми).
Альтернативой для рассмотрения может быть шаблон исходящих сообщений, где вы, по сути, создаете явное событие с предварительно вычисленным агрегированием из самого приложения. т.е. для этого требуется небольшая помощь приложения, но тогда он избегает тонкостей создания этого результата объединения постфактум.
Это должно быть на Java? Здесь также подойдет KSQL, я могу опубликовать пример, если вам интересно.