Flink JobExecutionException: akka.client.timeout

Я использую Flink v.1.4.0.

Я пытаюсь запустить задание, используя DataSet API через IntelliJ. Обратите внимание, что если я запускаю то же задание через Flink UI, задание выполняется нормально. Чтобы запустить задание, мне нужно сначала указать через переменные среды объем данных, которые будут обработаны. Когда сумма относительно небольшая, работа выполняется нормально. Но по мере того, как он становится больше, я начинаю получать следующую ошибку:

ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.runPipeline(EmailAnalyserPipeline.java:87)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.main(EmailAnalyserPipeline.java:65)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

Я вижу, что совет такой:

You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.

но я подозреваю, что проблема гораздо глубже. Но для этого мне нужно сначала настроить akka.client.timeout. Как мне сделать это в IntelliJ? и как долго должен быть тайм-аут?

Кроме того, что на самом деле вызывает это? Мне нужно увеличить память кучи или что-то в этом роде? Спасибо.

Попробуйте обновиться до флинка 1.4.1 и проверьте mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/…

kinkajou 19.03.2018 02:10

Обновление до 1.4.1 сейчас немного сложно, так как нужно будет изменить многие зависимости. Я смотрю, как «... увеличить сетевой стек akka (= удаленное взаимодействие)», но нет такого поля, которое я бы увеличил. Это akka.framesize?

Christos Hadjinikolis 20.03.2018 11:44

изменение с 1.4.0 на 1.4.2 намного менее безболезненно, чем если бы вы делали обновление с 1.3.2 до 1.4.0. Просто по моему опыту. Что касается значения тайм-аута, есть ли у вас образец кода, которым вы можете поделиться? Я знаю, что для API потока данных (если вы используете Kafka Connector в качестве источника) вы можете настроить тайм-аут в свойствах, которые вы передаете FlinkKafkaConsumer. Если вы дадите некоторое представление о том, как вы настраиваете свой источник, я мог бы посоветовать лучше.

Jicaar 09.04.2018 18:11

ОК. Мне все равно придется это сделать в какой-то момент; определенно, когда выйдет основной выпуск (1.5.0), но пока я придерживаюсь 1.4.0. Тем не менее, спасибо за вклад. Я уверен, что с обновлением многие из этих проблем будут решены.

Christos Hadjinikolis 18.04.2018 11:51
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
4
1 331
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы можете установить это свойство через файл конфигурации flink. См. https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#distributed-coordination-via-akka

Итак, в flink-conf.yaml вы должны добавить, например:

akka.client.timeout: 10min

Но похоже, что данные обрабатываются не в том месте. Возможно, вы загружаете данные в конструктор, а не в функцию map или run?

Спасибо за ответ, но я это уже знал. Я пытаюсь выяснить, можно ли это сделать через IntelliJ.

Christos Hadjinikolis 09.04.2018 15:53
Ответ принят как подходящий

Я смог разобраться, и это тоже оказалось не так уж и сложно. Все, что мне нужно было сделать, это перейти на Run > Edit Configurations и на вкладке Configucation в поле Program arguments добавить следующее:

-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s

Однако я должен отметить, что это не решило полностью мою проблему.

Работают ли настройки, пока мы передаем это в качестве аргументов программы?

Pritam Sadhukhan 13.09.2019 10:30

Другие вопросы по теме