Я использую Flink v.1.4.0.
Я пытаюсь запустить задание, используя DataSet API через IntelliJ. Обратите внимание, что если я запускаю то же задание через Flink UI, задание выполняется нормально. Чтобы запустить задание, мне нужно сначала указать через переменные среды объем данных, которые будут обработаны. Когда сумма относительно небольшая, работа выполняется нормально. Но по мере того, как он становится больше, я начинаю получать следующую ошибку:
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.runPipeline(EmailAnalyserPipeline.java:87)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.main(EmailAnalyserPipeline.java:65)
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
Я вижу, что совет такой:
You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
но я подозреваю, что проблема гораздо глубже. Но для этого мне нужно сначала настроить akka.client.timeout. Как мне сделать это в IntelliJ? и как долго должен быть тайм-аут?
Кроме того, что на самом деле вызывает это? Мне нужно увеличить память кучи или что-то в этом роде? Спасибо.
Обновление до 1.4.1 сейчас немного сложно, так как нужно будет изменить многие зависимости. Я смотрю, как «... увеличить сетевой стек akka (= удаленное взаимодействие)», но нет такого поля, которое я бы увеличил. Это akka.framesize?
изменение с 1.4.0 на 1.4.2 намного менее безболезненно, чем если бы вы делали обновление с 1.3.2 до 1.4.0. Просто по моему опыту. Что касается значения тайм-аута, есть ли у вас образец кода, которым вы можете поделиться? Я знаю, что для API потока данных (если вы используете Kafka Connector в качестве источника) вы можете настроить тайм-аут в свойствах, которые вы передаете FlinkKafkaConsumer. Если вы дадите некоторое представление о том, как вы настраиваете свой источник, я мог бы посоветовать лучше.
ОК. Мне все равно придется это сделать в какой-то момент; определенно, когда выйдет основной выпуск (1.5.0), но пока я придерживаюсь 1.4.0. Тем не менее, спасибо за вклад. Я уверен, что с обновлением многие из этих проблем будут решены.




Вы можете установить это свойство через файл конфигурации flink. См. https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#distributed-coordination-via-akka
Итак, в flink-conf.yaml вы должны добавить, например:
akka.client.timeout: 10min
Но похоже, что данные обрабатываются не в том месте. Возможно, вы загружаете данные в конструктор, а не в функцию map или run?
Спасибо за ответ, но я это уже знал. Я пытаюсь выяснить, можно ли это сделать через IntelliJ.
Я смог разобраться, и это тоже оказалось не так уж и сложно. Все, что мне нужно было сделать, это перейти на Run > Edit Configurations и на вкладке Configucation в поле Program arguments добавить следующее:
-Dakka.client.timeout:600s
-Dakka.ask.timeout:600s
Однако я должен отметить, что это не решило полностью мою проблему.
Работают ли настройки, пока мы передаем это в качестве аргументов программы?
Попробуйте обновиться до флинка 1.4.1 и проверьте mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/…