Использование Google Storage из Flink REPL

Я пытаюсь прочитать CSV-файл из облачного хранилища Google в Flink REPL. Поскольку я не очень разбираюсь во Flink, я предпочитаю работать в REPL, чтобы решать одну ошибку за раз, а не помещать свой код в JAR, а затем не знать, с чего начать со всеми ошибками.

В этом примере я буду использовать общедоступные данные Landsat в хранилище Google.

Я создал кластер dataproc и добавил сценарий bash, предоставленный облаком Google, для установки flink во время создания кластера. Скрипт можно найти здесь.

Поскольку я использую кластер dataproc, мне нужно только добавить банку gcs-connector в путь к классам. Итак, я запускаю Flink REPL следующим образом:

/usr/lib/flink/bin/start-scala-shell.sh yarn -a /usr/lib/hadoop/lib/gcs-connector-hadoop2-1.9.10.jar

Затем я импортирую облачное хранилище Google, используя эту строку кода в REPL:

 import com.google.cloud.hadoop.fs.gcs

Наконец, я пытаюсь прочитать общедоступный файл как текстовый и получаю следующую ошибку:

 val landsaturl = "gs://gcp-public-data-landsat/LC08/01/001/002/LC08_L1GT_001002_20160817_20170322_01_T2/LC08_L1GT_001002_20160817_20170322_01_T2_ANG.txt"
landsat.first(1).print()
2018-12-17 19:10:44,210 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 0 registered types and 0 default Kryo serializers
2018-12-17 19:10:44,210 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2018-12-17 19:10:44,210 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2018-12-17 19:10:46,091 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 7aefc693e7911bd9ef80c3ebcf6a8343 (detached: false).
2018-12-17 19:10:46,091 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Requesting blob server port.
2018-12-17 19:11:46,146 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2018-12-17 19:11:46,148 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
  at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
  at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:216)
  at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:193)
  at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
  at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816)
  at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
  at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
  at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1726)
  ... 30 elided
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
  at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357)
  at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
  at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
  at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
  at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
  at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:195)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
  at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1088)
  at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
  ... 21 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
  ... 19 more
Caused by: java.util.concurrent.CompletionException: java.net.ConnectException: Connection refused: cluster-****
  at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
  at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
  at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
  ... 16 more
Caused by: java.net.ConnectException: Connection refused: cluster-****
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
  ... 7 more

Я пропустил шаг или это то, что невозможно сделать в REPL и можно сделать только с помощью толстой банки и указания вещей, связанных с облачным хранилищем Google, в pom.xml?

Похоже, вам нужно использовать Flink Разъем HDFS, потому что коннектор GCS реализует интерфейс HDFS.

Igor Dvorzhak 17.12.2018 21:43
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
1
315
1

Ответы 1

Вам нужно использовать Разъем Flink HDFS, потому что соединитель GCS реализует интерфейс HDFS.

Другие вопросы по теме