Учитывая DataStreamReader, настроенный для подписки на несколько тем, подобных этой (см. здесь):
// Subscribe to multiple topics
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
Когда я использую foreachBatch помимо этого, что будет содержать партия?
В моем случае использования я хотел бы иметь пакеты с сообщениями, поступающими только из одной темы. Можно ли это настроить?





Пакет будет содержать сообщения, поступающие из всех тем (вместо этого я бы сказал разделов), на которые подписан ваш потребитель.
@Beryllium Негласно потребитель подписывается на определенные разделы данной темы (тем). Если в группе потребителей есть только один потребитель, он подписывается на все разделы.
Цитирование официальной документации в Структурированная потоковая передача + Руководство по интеграции Kafka (брокер Kafka версии 0.10.0 или выше):
// Subscribe to multiple topics
... .option("subscribe", "topic1,topic2")
Приведенный выше код — это то, на что подписывается базовый потребитель Kafka (потокового запроса).
When I use foreachBatch on top of this, what will the batches contain?
- Each batch will only contain messages from one topic?
Это правильный ответ.
I'd like to have batches with messages coming from one topic only. Is it possible to configure this?
Это также задокументировано в Структурированная потоковая передача + Руководство по интеграции Kafka (брокер Kafka версии 0.10.0 или выше):
Each row in the source has the following schema:
...
topic
Другими словами, входной набор данных будет иметь столбец topic с названием темы, из которой взята данная строка (запись).
Чтобы иметь "пакеты с сообщениями, поступающими только из одной темы", вы просто filter или where с одной темой, например.
val messages: DataFrame = ...
assert(messages.isStreaming)
messages
.writeStream
.foreachBatch { case (df, batchId) =>
val topic1Only = df.where($"topic" === "topic1")
val topic2Only = df.where($"topic" === "topic2")
...
}
Что касается 2-й части моего вопроса: Под конфигурацией я имел в виду, чтобы избежать фильтрации, потому что пакеты должны обрабатываться полностью или не обрабатываться вообще. Однако, когда пакеты содержат только сообщения из одной темы, все в порядке. Спасибо за ответ.
@jacek Есть ли способ написать это программно вместо указания значения для каждой темы? я думал перебрать список тем, это кажется неэффективным
@collarblind Просто используйте поле topic, и эта строка будет «перенаправлена» в эту тему.
@JacekLaskowski, мой вопрос сбивает с толку. если у меня есть темы val topics=Seq("t1","t2") и у моего foreachBatch есть эта, topics.map(t => df.where($"topic" === "t1").write(). это то же самое, что и ваш код?
Почему вы foreachBatch пишете в Kafka, если у вас есть встроенный источник данных?
я хочу читать из нескольких тем и записывать каждую тему в их местоположение s3
Спасибо за Ваш ответ. Это основано на наблюдениях или где-то задокументировано? Вопрос о множественных темы (а не о перегородки).