Что содержат foreachBatches в потоковом запросе из нескольких тем Kafka?

Учитывая DataStreamReader, настроенный для подписки на несколько тем, подобных этой (см. здесь):

// Subscribe to multiple topics
spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2,topic3")

Когда я использую foreachBatch помимо этого, что будет содержать партия?

  • Каждый пакет будет содержать сообщения только из одной темы?
  • Или пакет может содержать сообщения, приходящие из разных тем?

В моем случае использования я хотел бы иметь пакеты с сообщениями, поступающими только из одной темы. Можно ли это настроить?

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
4
0
295
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Пакет будет содержать сообщения, поступающие из всех тем (вместо этого я бы сказал разделов), на которые подписан ваш потребитель.

Спасибо за Ваш ответ. Это основано на наблюдениях или где-то задокументировано? Вопрос о множественных темы (а не о перегородки).

Beryllium 11.07.2019 15:19

@Beryllium Негласно потребитель подписывается на определенные разделы данной темы (тем). Если в группе потребителей есть только один потребитель, он подписывается на все разделы.

Giorgos Myrianthous 11.07.2019 15:21
Ответ принят как подходящий

Цитирование официальной документации в Структурированная потоковая передача + Руководство по интеграции Kafka (брокер Kafka версии 0.10.0 или выше):

// Subscribe to multiple topics

...
.option("subscribe", "topic1,topic2")

Приведенный выше код — это то, на что подписывается базовый потребитель Kafka (потокового запроса).

When I use foreachBatch on top of this, what will the batches contain?

  • Each batch will only contain messages from one topic?

Это правильный ответ.

I'd like to have batches with messages coming from one topic only. Is it possible to configure this?

Это также задокументировано в Структурированная потоковая передача + Руководство по интеграции Kafka (брокер Kafka версии 0.10.0 или выше):

Each row in the source has the following schema:

...

topic

Другими словами, входной набор данных будет иметь столбец topic с названием темы, из которой взята данная строка (запись).

Чтобы иметь "пакеты с сообщениями, поступающими только из одной темы", вы просто filter или where с одной темой, например.

val messages: DataFrame = ...
assert(messages.isStreaming)

messages
  .writeStream
  .foreachBatch { case (df, batchId) =>
    val topic1Only = df.where($"topic" === "topic1")
    val topic2Only = df.where($"topic" === "topic2")
    ...
  }

Что касается 2-й части моего вопроса: Под конфигурацией я имел в виду, чтобы избежать фильтрации, потому что пакеты должны обрабатываться полностью или не обрабатываться вообще. Однако, когда пакеты содержат только сообщения из одной темы, все в порядке. Спасибо за ответ.

Beryllium 16.07.2019 13:30

@jacek Есть ли способ написать это программно вместо указания значения для каждой темы? я думал перебрать список тем, это кажется неэффективным

collarblind 31.03.2021 20:51

@collarblind Просто используйте поле topic, и эта строка будет «перенаправлена» в эту тему.

Jacek Laskowski 02.04.2021 00:40

@JacekLaskowski, мой вопрос сбивает с толку. если у меня есть темы val topics=Seq("t1","t2") и у моего foreachBatch есть эта, topics.map(t => df.where($"topic" === "t1").write(). это то же самое, что и ваш код?

collarblind 02.04.2021 02:16

Почему вы foreachBatch пишете в Kafka, если у вас есть встроенный источник данных?

Jacek Laskowski 03.04.2021 12:28

я хочу читать из нескольких тем и записывать каждую тему в их местоположение s3

collarblind 05.04.2021 15:52

Другие вопросы по теме