Я изо всех сил пытался сделать эту работу, я знаю, что это межъязыковое преобразование и все такое, и я установил Java jdk на свой компьютер (когда я пишу java -версию в cmd, я получаю правильную информацию и все такое), но когда я пытаюсь заставить простой конвейер работать:
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',temp_location=temp_location,staging_location=staging_location,project=project,experiments=['use_runner_v2'],streaming=True)
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config = {'bootstrap.servers':'xxxxx-xxxxx...','group_id':'testAB'},topics=['users'])
msgs | beam.FlatMap(print)
if __name__ == '__main__':
main()
Я получаю эту ошибку: ValueError: Неподдерживаемый сигнал: 2
Я попытался добавить параметр extension_service= 'beam:external:java:kafka:read:v1' в ReadFromKafka, но потом получаю:
статус = Код Статуса. НЕДОСТУПЕН
details = "Не удалось разрешить DNS для луч: внешний: java: kafka: чтение: v1: НЕИЗВЕСТНО: ошибка ОС"
Я работаю над средой venv python, если эта информация может быть полезна, и мой кластер kafka находится в слитном облаке.
Я также получаю эту ошибку времени выполнения: RuntimeError: java.lang.RuntimeException: не удалось получить зависимости луча: преобразование: org.apache.beam: kafka_read_without_metadata: v1 из спецификации urn: «луч: преобразование: org.apache.beam: kafka_read_without_metadata: v1»
Я использую загрузочный сервер, который мне предоставляет слитное облако. Я обновлю пост изображением, чтобы вы могли видеть, о других настройках аутентификации (я знаю, о каких настройках вы говорите). Я не знаю, где я должен писать их в моем код ... О сбое разрешения DNS, я не думаю, что это связано с IP-адресом сервера начальной загрузки, потому что он говорит, что связан с расширением_сервиса beam.external: java .....
В нем говорится, что это не удалось для beam:external:java:kafka:read:v1
, что вообще не является допустимым DNS-именем. Он также говорит, что ему не удалось получить зависимости, поэтому ваш код имеет открытый доступ к Интернету для загрузки зависимостей?
Я студент, поэтому я действительно новичок во всем этом, я думаю, что мой код имеет доступ к зависимостям загрузки, так как я установил некоторые пакеты python с pip, например apache beam, или вы ссылаетесь на другой вид доступа, если да, как я должен дать разрешения на мой код?
Я не использовал Beam с Python, но если вы можете использовать Pip, это должно быть хорошо. Что касается аутентификации Confluent, то в методах Beam Java у читалки Kafka есть метод withConsumerConfigs
, а у вас в коде есть что-то похожее
Кроме того, ошибка разрешения DNS с ошибкой появляется только тогда, когда я указываю extension_service, однако я думаю, что если я не укажу ее, преобразование ReadFromKafka сделает это автоматически, поскольку, когда я делаю это таким образом, я получаю ошибку, не удалось получить зависимости
Моя ошибка заключалась в том, что я пропустил шаг, на котором мне нужно запустить службу расширения, я сделал это с помощью этой команды
java -jar beam-sdks-java-io-expansion-service-2.37.0.jar 8088 --javaClassLookupAllowlistFile='*'
после загрузки beam-sdks-java-io-expansion-service-2.37.0.jar из https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.36.0
а затем указать порт в extension_service='localhost:8088'
Затем у меня были две незначительные ошибки: одна заключалась в том, что я использовал JDK 18, и я думаю, что он несовместим с https://beam.apache.org/get-started/quickstart-java/, поэтому я переключился на JDK 17 и использовал python 3.8 вместо python 3.10.
DNS resolution failed
? Вы уверены, что указали правильные загрузочные серверы? Кроме того, вам не хватает настроек аутентификации, если вы используете Confluent Cloud.