Установка dask на gke с планировщиком clusterIP и переадресацией портов — время ожидания соединения tcp при создании клиента dask.distibuted в python

Я попытался развернуть dask на кластере gke, используя helm и румпель. У меня нет разрешений на создание внешних IP-адресов для моего кластера, поэтому я настроил планировщик Dask в качестве IP-адреса кластера вместо балансировщика нагрузки. Затем я использовал предоставленную kubectl переадресацию портов, чтобы перенаправить службу dask scheduler на мою локальную машину.

У меня есть код Python (простой пример dask), в котором я хочу подключить распределенного клиента к (переадресованному) tcp-порту планировщика dask и запустить вычисления в моем кластере gke. Однако я получаю тайм-аут на tcp-соединении.

Мне интересно, где я мог ошибиться в этом процессе или мне нужно больше разрешений в моей учетной записи gcp, чтобы заставить это работать. Любые указатели, которые вы можете предоставить, очень ценятся. Обратите внимание, что я смог открыть блокнот jupyter на переадресованном http-порту и смог запустить некоторые вычисления на 3 рабочих процессах по умолчанию, созданных dask на моем gke.

Я вставляю ниже код Python, который я запускал, ошибку, которую я вижу, текущий статус модулей, узлов, настройку служб для dask на gke и команды, которые я использовал для настройки dask на моем gcp-gke.

мой пример программы на Python (dask-example.py)

#!/usr/bin/env python3

from dask.distributed import Client
import dask.array as da

client = Client('tcp://127.0.0.1:8080')

array = da.ones((1000, 1000, 1000))

mn = array.mean().compute()  # Should print 1.0

print(mn)

Сообщение об ошибке (при запуске python3 dask-example.py):

Traceback (most recent call last):
  File "/home/userenv/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect
    comm = await asyncio.wait_for(
  File "/usr/lib/python3.8/asyncio/tasks.py", line 498, in wait_for
    raise exceptions.TimeoutError()
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "trial.py", line 17, in <module>
    client = Client('tcp://127.0.0.1:8080', timeout=10)
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 743, in __init__
    self.start(timeout=timeout)
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 948, in start
    sync(self.loop, self._start, **kwargs)
  File "/home/userenv/lib/python3.8/site-packages/distributed/utils.py", line 340, in sync
    raise exc.with_traceback(tb)
  File "/home/userenv/lib/python3.8/site-packages/distributed/utils.py", line 324, in f
    result[0] = yield future
  File "/home/userenv/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 1038, in _start
    await self._ensure_connected(timeout=timeout)
  File "/home/userenv/lib/python3.8/site-packages/distributed/client.py", line 1095, in _ensure_connected
    comm = await connect(
  File "/home/userenv/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
    raise IOError(
OSError: Timed out trying to connect to tcp://127.0.0.1:8080 after 10 s

статус dask на моем gke

kubectl get po #показывает это

NAME                                       READY   STATUS              RESTARTS   AGE
my-dask-jupyter-565c5c5659-w4s76           1/1     Running             0          27h
my-dask-scheduler-6bf8bc8bbf-xgj2q         1/1     Running             0          27h
my-dask-worker-68b5b695bd-l2b6m            1/1     Running             0          27h
my-dask-worker-68b5b695bd-xnssz            1/1     Running             0          27h
my-dask-worker-68b5b695bd-z68wt            1/1     Running             0          27h

kubectl get no # показывает это

NAME                                          STATUS   ROLES    AGE   VERSION
gke-dask-cluster-default-pool-d3f451b1-gp47   Ready    <none>   27h   v1.17.14-gke.1200
gke-dask-cluster-default-pool-d3f451b1-rk8z   Ready    <none>   27h   v1.17.14-gke.1200

kubectl get svC# показывает это

NAME                TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)           AGE
kubernetes          ClusterIP   10.83.240.1    <none>        443/TCP           27h
my-dask-jupyter     ClusterIP   10.83.244.84   <none>        80/TCP            27h
my-dask-scheduler   ClusterIP   10.83.244.59   <none>        8786/TCP,80/TCP   27h

Команды, которые я использовал для настройки dask (спасибо этому сообщению https://libinruan.github.io/2019/05/24/Set-up-Kubernetes-clusters-for-Python-ML/)

export PROJECTID='mygcp'
export EMAIL = "[email protected]"
export ZONE='us-central1-c'
export REGION='us-central1' 
export MIN_WORKER_NODES=0
export MAX_WORKER_NODES=100
export CLUSTER_NAME='dask-cluster'
export WORKER_MACHINE_TYPE='n1-standard-2'
export MACHINE_TYPE='n1-standard-2'
NUM_NODES=2
 
gcloud config set project $PROJECTID

gcloud services enable container.googleapis.com

gcloud container clusters create $CLUSTER_NAME --machine-type $MACHINE_TYPE --num-nodes $NUM_NODES --zone $ZONE --cluster-version latest

gcloud container clusters get-credentials $DASK_KUBE_CLUSTER_NAME --zone=$DASK_KUBE_CLUSTER_ZONE --project $DASK_GCLOUD_PROJECT

kubectl config set-cluster $DASK_KUBE_CLUSTER_NAME

kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=$EMAIL

kubectl create serviceaccount tiller --namespace=kube-system

kubectl create clusterrolebinding tiller --clusterrole=cluster-admin --serviceaccount=kube-system:tiller
helm init --service-account tiller --wait

kubectl --namespace=kube-system patch deployment tiller-deploy --type=json \
--patch='[{"op": "add", "path": "/spec/template/spec/containers/0/command", "value": ["/tiller", "--listen=localhost:44134"]}]'

gcloud container clusters get-credentials $CLUSTER_NAME --zone=$ZONE --project $PROJECTID

kubectl config set-cluster $DASK_KUBE_CLUSTER_NAME

helm install -n my-dask stable/dask -f dask-worker-spec.yml --set scheduler.serviceType=ClusterIP --set jupyter.serviceType=ClusterIP

Здравствуйте, я использовал настройку, как вы опубликовали, и наткнулся на ту же проблему. Я считаю, что это руководство устарело, а диаграмма Helm уже устарела. Рассматривали ли вы возможность использования более новой диаграммы Helm и Helm3? Я использовал эту документацию с Helm3, и мне удалось подключиться (он сообщил мне о несоответствиях в версии модуля, но подключился). К сожалению, на этом мои знания Python/Dask заканчиваются.

Dawid Kruk 14.12.2020 21:22
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
1
406
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Мне удалось воспроизвести такое же поведение, как описано в вопросе.

Я рекомендую вам проверить более новую версию dask, которая доступна на его домашней странице, поскольку та, которая использовалась в вопросе, устарела.

TL;DR

Используйте те же версии модулей Python (dask, distributed и т. д.) программного обеспечения, которое использует dask-scheduler!


Воспроизведение установки, использованной в вопросе

  • Создайте кластер GKE:
    • $ gcloud beta container clusters create "gke-dask" --zone "europe-west3-c" --no-enable-basic-auth --cluster-version "1.17.13-gke.2600" --release-channel "regular" --machine-type "e2-standard-4"

e2-standard-4 = машина с 4 ядрами и 16 ГБ ОЗУ

Чтобы определить проблему, я провел несколько тестов:

  • $ kubectl port-forwarddask-scheduler на мою машину и использовать пример кода Python, который был в вопросе. Я установил зависимости, необходимые этому коду, но в разных версиях ($ pip install dask[complete]. Ошибка кода с тем же сообщением, что и в вопросе.
  • $ kubectl run -it --rm ubuntu -- /bin/bash - ошибка из вопроса указывала на то, что это может быть проблема, связанная с сетью. Я использовал ubuntuPod с той же установкой Python, чтобы устранить потенциальные проблемы с сетевым подключением (port-forward). Ошибка кода с тем же сообщением, что и в вопросе.
  • $ kubectl exec -it DASK-SCHEDULER-POD-NAME -- /bin/bash — если это проблема с сетевым подключением, он должен работать на Pod, который должен обрабатывать этот код. Код успешно выполнен и возвращен 1.0.

Я пытался использовать ubuntuPod с Python, и его модули более близки по версии к версии в dask-scheduler. Это вызвало ошибку, отличную от той, что указана в вопросе. Это указывает на то, что проблема связана не с сетью, а с используемым программным обеспечением (его версиями). Я не мог точно воспроизвести настройку, используемую в dask-scheduler, поэтому я использовал образ, который использует dask-scheduler, и создал дополнительный Pod, чтобы проверить, будет ли он работать через сеть GKE. Это сработало!

Пожалуйста, рассмотрите приведенный ниже пример как обходной путь!

Шаги для запуска с вашей машины:

  • $ kubectl port-forward service/my-dask-scheduler --address 0.0.0.0 8786:8786 & - переадресация dask сервиса на вашу машину
  • $ docker run --rm -ti daskdev/dask:1.1.5 /bin/bash - запустите контейнер Docker на своем компьютере и exec в него bash. Это изображение совпадает с вашим dask-scheduler, чтобы сохранить версии Python и его модулей.
  • Используйте следующий код Python внутри контейнера:
#!/usr/bin/env python3
from dask.distributed 
import Client
import dask.array as da
client = Client('tcp://IP_ADDRESS:8786')
array = da.ones((1000, 1000, 1000))
mn = array.mean().compute() # Should print 1.0
print(mn)

Пожалуйста, укажите в «IP_ADDRESS» IP-адрес вашего компьютера, доступный из вашего контейнера Docker!

Вы должны получить следующий результат:

(base) root@81fb5004ea4c:/# python3 script.py 
1.0

Вы можете проверить Python venv для виртуальных сред.

Дополнительная ссылка:

Большое спасибо за изучение вопроса. Не могли бы вы сообщить мне, как я могу получить IP_ADDRESS машины, доступной изнутри докера?

dasknovice 31.12.2020 03:42

Обновление: мне удалось заставить все работать без необходимости в докере, используя virtualenv с dask == 2.30.0 и Distributed == 2.30.0. Мне также пришлось использовать helm install -n my-dask dask/dask... вместо helm install -n my-dask stable/dask. Кроме того, спасибо за «бету» в бета-версии gcloud. Это помогло мне перейти на версии dask >=2 . без бета-версии я застрял с dask == 1.1.5, который, я думаю, нуждается в обходном пути докера, упомянутом в вашем решении.

dasknovice 31.12.2020 04:43

@dasknovice gcloud beta не должен иметь никакого значения в этом отношении. Пожалуйста, ознакомьтесь с дополнительной информацией об этом здесь . Я не совсем уверен, что вы подразумеваете под IP-адресом машины, доступной из докера, но я думаю, что это может помочь.

Dawid Kruk 05.01.2021 19:13

Другие вопросы по теме