Пример присоединения к потоку с Apache Kafka?

Я искал пример использования Kafka Streams о том, как это сделать, т.е. присоединиться к таблице клиентов с таблицей адресов и передать данные в ES: -

Клиенты

+------+------------+----------------+-----------------------+
| id   | first_name | last_name      | email                 |
+------+------------+----------------+-----------------------+
| 1001 | Sally      | Thomas         | [email protected] |
| 1002 | George     | Bailey         | [email protected]    |
| 1003 | Edward     | Davidson       | [email protected]         |
| 1004 | Anne       | Kim            | [email protected]    |
+------+------------+----------------+-----------------------+

Адреса

+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street                    | city       | state        | zip   | type     |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 |        1001 | 3183 Moore Avenue         | Euless     | Texas        | 76036 | SHIPPING |
| 11 |        1001 | 2389 Hidden Valley Road   | Harrisburg | Pennsylvania | 17116 | BILLING  |
| 12 |        1002 | 281 Riverside Drive       | Augusta    | Georgia      | 30901 | BILLING  |
| 13 |        1003 | 3787 Brownton Road        | Columbus   | Mississippi  | 39701 | SHIPPING |
| 14 |        1003 | 2458 Lost Creek Road      | Bethlehem  | Pennsylvania | 18018 | SHIPPING |
| 15 |        1003 | 4800 Simpson Square       | Hillsdale  | Oklahoma     | 73743 | BILLING  |
| 16 |        1004 | 1289 University Hill Road | Canehill   | Arkansas     | 72717 | LIVING   |
+----+-------------+---------------------------+------------+--------------+-------+----------+

Выходной индекс Elasticsearch

"hits": [
  {
    "_index": "customers_with_addresses",
    "_type": "_doc",
    "_id": "1",
    "_score": 1.3278645,
    "_source": {
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "[email protected]",
      "addresses": [{
        "street": "3183 Moore Avenue",
        "city": "Euless",
        "state": "Texas",
        "zip": "76036",
        "type": "SHIPPING"
      }, {
        "street": "2389 Hidden Valley Road",
        "city": "Harrisburg",
        "state": "Pennsylvania",
        "zip": "17116",
        "type": "BILLING"
      }],
    }
  }, ….

Табличные данные поступают из тем Debezium, правильно ли я думаю, что мне нужна какая-то Java посередине, чтобы присоединиться к потокам, вывести их в новую тему, которая затем погружает их в ES?

У кого-нибудь есть пример кода этого?

Спасибо.

Это должно быть на Java? Здесь также подойдет KSQL, я могу опубликовать пример, если вам интересно.

Robin Moffatt 22.05.2019 10:38

Привет, Робин. Меня бы тоже очень заинтересовал KSQL, я просто подумал, что мне нужно это сделать с помощью Java! :)

Gavin Gilmour 22.05.2019 11:00
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
2
1 280
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Да, вы можете реализовать решение с помощью API потоков Kafka в java следующим образом.

  1. Используйте темы как поток.
  2. Объедините поток адресов в список, используя идентификатор клиента, и преобразуйте поток в таблицу.
  3. Присоединяйтесь к потоку клиентов с таблицей адресов

Ниже приведен пример (учитывая, что данные используются в формате json):

KStream<String,JsonNode> customers = builder.stream("customer", Consumed.with(stringSerde, jsonNodeSerde));
KStream<String,JsonNode> addresses = builder.stream("address", Consumed.with(stringSerde, jsonNodeSerde));

// Select the customer ID as key in order to join with address. 
KStream<String,JsonNode> customerRekeyed = customers.selectKey(value-> value.get("id").asText());

ObjectMapper mapper = new ObjectMapper();    
// Select Customer_id as key to aggregate the addresses and join with customer
KTable<String,JsonNode> addressTable = addresses
        .selectKey(value-> value.get("customer_id").asText())
        .groupByKey()
        .aggregate(() ->mapper::createObjectNode,  //initializer
                   (key,value,aggregate) -> aggregate.add(value),
                 Materialized.with(stringSerde, jsonNodeSerde)
         );  //adder

// Join Customer Stream with Address Table
KStream<String,JsonNode> customerAddressStream = customerRekeyed.leftJoin(addressTable,
               (left,right) -> {
                      ObjectNode finalNode = mapper.createObjectNode();
                      ArrayList addressList = new ArrayList<JsonNode>();
                      // Considering the address is arrayNode
                      ((ArrayNode)right).elements().forEachRemaining(addressList ::add);
                      left.putArray("addresses").allAll(addressList);              
                      return left;
               },Joined.keySerde(stringSerde).withValueSerde(jsonNodeSerde));

Подробную информацию обо всех типах соединений можно найти здесь:

https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#joining

Спасибо Нишу! Это проясняет ситуацию для меня... тогда, по-видимому, я просто перенаправлю customerAddressStream в другую тему для погружения в ES через соединение Kafka?

Gavin Gilmour 22.05.2019 11:02

Да, именно. :)'

Nishu Tayal 22.05.2019 12:40
Ответ принят как подходящий

В зависимости от того, насколько строги ваши требования к вложению нескольких адресов в один клиентский узел, вы можете сделать это в KSQL (который построен на основе Kafka Streams).

Заполните некоторые тестовые данные в Kafka (что в вашем случае делается уже через Debezium):

$ curl -s "https://api.mockaroo.com/api/ffa9ff20?count=10&key=ff7856d0" | kafkacat -b localhost:9092 -t addresses -P

$ curl -s "https://api.mockaroo.com/api/9b868890?count=4&key=ff7856d0" | kafkacat -b localhost:9092 -t customers -P

Запустите KSQL и для начала просто проверьте данные:

ksql> PRINT 'addresses' FROM BEGINNING ;
Format:JSON
{"ROWTIME":1558519823351,"ROWKEY":"null","id":1,"customer_id":1004,"street":"8 Moulton Center","city":"Bronx","state":"New York","zip":"10474","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":2,"customer_id":1001,"street":"5 Hollow Ridge Alley","city":"Washington","state":"District of Columbia","zip":"20016","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":3,"customer_id":1000,"street":"58 Maryland Point","city":"Greensboro","state":"North Carolina","zip":"27404","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":4,"customer_id":1002,"street":"55795 Derek Avenue","city":"Temple","state":"Texas","zip":"76505","type":"LIVING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":5,"customer_id":1002,"street":"164 Continental Plaza","city":"Modesto","state":"California","zip":"95354","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":6,"customer_id":1004,"street":"6 Miller Road","city":"Louisville","state":"Kentucky","zip":"40205","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":7,"customer_id":1003,"street":"97 Shasta Place","city":"Pittsburgh","state":"Pennsylvania","zip":"15286","type":"BILLING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":8,"customer_id":1000,"street":"36 Warbler Circle","city":"Memphis","state":"Tennessee","zip":"38109","type":"SHIPPING"}
{"ROWTIME":1558519823351,"ROWKEY":"null","id":9,"customer_id":1001,"street":"890 Eagan Circle","city":"Saint Paul","state":"Minnesota","zip":"55103","type":"SHIPPING"}
{"ROWTIME":1558519823354,"ROWKEY":"null","id":10,"customer_id":1000,"street":"8 Judy Terrace","city":"Washington","state":"District of Columbia","zip":"20456","type":"SHIPPING"}
^C
Topic printing ceased

ksql>
ksql> PRINT 'customers' FROM BEGINNING;
Format:JSON
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1001,"first_name":"Jolee","last_name":"Handasyde","email":"[email protected]"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1002,"first_name":"Rebeca","last_name":"Kerrod","email":"[email protected]"}
{"ROWTIME":1558519852363,"ROWKEY":"null","id":1003,"first_name":"Bobette","last_name":"Brumble","email":"[email protected]"}
{"ROWTIME":1558519852368,"ROWKEY":"null","id":1004,"first_name":"Royal","last_name":"De Biaggi","email":"[email protected]"}

Теперь мы объявляем STREAM (тема Kafka + схема) для данных, чтобы мы могли манипулировать ими дальше:

ksql> CREATE STREAM addresses_RAW (ID INT, CUSTOMER_ID INT, STREET VARCHAR, CITY VARCHAR, STATE VARCHAR, ZIP VARCHAR, TYPE VARCHAR) WITH (KAFKA_TOPIC='addresses', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> CREATE STREAM customers_RAW (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Мы собираемся смоделировать customers как TABLE, и для этого сообщения Kafka должны иметь правильные ключи (и момент, когда они имеют нулевые ключи, как видно из "ROWKEY":"null" в выводе PRINT выше). Вы можете настроить Debezium для установки ключа сообщения, поэтому этот шаг может не понадобиться вам в KSQL:

ksql> CREATE STREAM CUSTOMERS_KEYED WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_RAW PARTITION BY ID;

 Message
----------------------------
 Stream created and running
----------------------------

Теперь мы объявляем TABLE (состояние для данного ключа, созданного из темы + схемы Kafka):

ksql> CREATE TABLE CUSTOMER (ID INT, FIRST_NAME VARCHAR, LAST_NAME VARCHAR, EMAIL VARCHAR) WITH (KAFKA_TOPIC='CUSTOMERS_KEYED', VALUE_FORMAT='JSON', KEY='ID');

 Message
---------------
 Table created
---------------

Теперь мы можем объединить данные:


ksql> CREATE STREAM customers_with_addresses AS 
      SELECT CUSTOMER_ID, 
             FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, 
             FIRST_NAME, 
             LAST_NAME, 
             TYPE AS ADDRESS_TYPE, 
             STREET, 
             CITY, 
             STATE, 
             ZIP 
        FROM ADDRESSES_RAW A 
             INNER JOIN CUSTOMER C 
             ON A.CUSTOMER_ID = C.ID;

 Message
----------------------------
 Stream created and running
----------------------------

Это создает новый KSQL STREAM, который, в свою очередь, заполняет новую тему Kafka.

ksql> SHOW STREAMS;

 Stream Name                              | Kafka Topic                          | Format
------------------------------------------------------------------------------------------
 CUSTOMERS_KEYED                          | CUSTOMERS_KEYED                      | JSON
 ADDRESSES_RAW                            | addresses                            | JSON
 CUSTOMERS_RAW                            | customers                            | JSON
 CUSTOMERS_WITH_ADDRESSES                 | CUSTOMERS_WITH_ADDRESSES             | JSON

Поток имеет схему:

ksql> DESCRIBE CUSTOMERS_WITH_ADDRESSES;

Name                 : CUSTOMERS_WITH_ADDRESSES
 Field        | Type
------------------------------------------
 ROWTIME      | BIGINT           (system)
 ROWKEY       | VARCHAR(STRING)  (system)
 CUSTOMER_ID  | INTEGER          (key)
 FULL_NAME    | VARCHAR(STRING)
 FIRST_NAME   | VARCHAR(STRING)
 ADDRESS_TYPE | VARCHAR(STRING)
 LAST_NAME    | VARCHAR(STRING)
 STREET       | VARCHAR(STRING)
 CITY         | VARCHAR(STRING)
 STATE        | VARCHAR(STRING)
 ZIP          | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Мы можем запросить поток:

ksql> SELECT * FROM CUSTOMERS_WITH_ADDRESSES WHERE CUSTOMER_ID=1002;
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | LIVING | Kerrod | 55795 Derek Avenue | Temple | Texas | 76505
1558519823351 | 1002 | 1002 | Rebeca Kerrod | Rebeca | SHIPPING | Kerrod | 164 Continental Plaza | Modesto | California | 95354

Мы также можем передать его в Elasticsearch с помощью Kafka Connect:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d '{
      "name": "sink-elastic-customers_with_addresses-00",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "CUSTOMERS_WITH_ADDRESSES",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "type.name=kafkaconnect",
        "key.ignore": "true",
        "schema.ignore": "true",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
      }
    }'

Результат:

$ curl -s http://localhost:9200/customers_with_addresses/_search | jq '.hits.hits[0]'
{
  "_index": "customers_with_addresses",
  "_type": "type.name=kafkaconnect",
  "_id": "CUSTOMERS_WITH_ADDRESSES+0+2",
  "_score": 1,
  "_source": {
    "ZIP": "76505",
    "CITY": "Temple",
    "ADDRESS_TYPE": "LIVING",
    "CUSTOMER_ID": 1002,
    "FULL_NAME": "Rebeca Kerrod",
    "STATE": "Texas",
    "STREET": "55795 Derek Avenue",
    "LAST_NAME": "Kerrod",
    "FIRST_NAME": "Rebeca"
  }
}

Что касается «Вы можете настроить Debezium для установки ключа сообщения»: на самом деле это всегда так. Ключ сообщения будет представлять столбец (столбцы) первичного ключа захваченной таблицы; или, если его нет, в зависимости от соединителя, первый уникальный ключ таблицы.

Gunnar 23.05.2019 21:02

@Robin, как бы это выглядело с KSQL, если бы мы хотели создать один документ, содержащий одного клиента и все его адреса (т. е. встроенный массив). Я думаю, что это предпочтительное представление при отправке таких данных в ES. Это то, что мы пробовали с сообщением в блоге, на которое я ссылался, и здесь все становится сложнее.

Gunnar 23.05.2019 21:34

@Gunnar да, это невозможно в KSQL… пока :) См. github.com/confluentinc/ksql/issues/2147

Robin Moffatt 24.05.2019 10:10

Я правильно понял, чтобы объединить два потока, мы должны объединить потоки по общему ключу? Какой идентификатор одного потока является задержкой?

user14514318 15.11.2020 11:46

Недавно мы создали демонстрацию и Сообщение блога именно для этого варианта использования (потоковая передача агрегатов в Elasticsearch) в блоге Debezium.

Одна проблема, о которой следует помнить, заключается в том, что это решение (основанное на Kafka Streams, но я считаю, что то же самое для KSQL) склонно к раскрытию результатов промежуточного соединения. Например. Предположим, вы вставляете клиента и 10 адресов в одну транзакцию. Подход с потоковым соединением может сначала создать совокупность клиентов и их первых пяти адресов, а вскоре после этого — полную совокупность со всеми 10 адресами. Это может быть или не быть желательным для вашего конкретного варианта использования. Я также помню, что обработка удалений не тривиальна (например, если вы удалите один из 10 адресов, вам придется снова создать совокупность с оставшимися 9 адресами, которые, возможно, остались нетронутыми).

Альтернативой для рассмотрения может быть шаблон исходящих сообщений, где вы, по сути, создаете явное событие с предварительно вычисленным агрегированием из самого приложения. т.е. для этого требуется небольшая помощь приложения, но тогда он избегает тонкостей создания этого результата объединения постфактум.

Другие вопросы по теме