Ошибка: нет имени модуля custom class при передаче объекта client в конструкторе настраиваемого класса в dask

Я пытался написать собственные классы для Preprocessing, а также алгоритмы Feature selection и Machine Learning.

Я взломал этот (preprocessing only) с помощью @delayed. Но когда я прочитал из учебные пособия, то же самое можно сделать с помощью Client. Это вызвало две проблемы.

Running as a script. Not as a Jupyter notebook

Первая проблема:

# Haven't run any scheduler or worker manually
client = Client() # Nothing passed as an argument
# Local Cluster is not working;
Error:... 
       if __name__=='__main__': 
            freeze_support()
      ...

Я пробовал то же самое в Jupyter Notebook, без запуска какого-либо планировщика или рабочих на разных терминалах. Это сработало!!

Теперь я запустил 3 терминала с 1 планировщиком и 2 рабочими и изменил его на Client('IP') в скрипте. Ошибка устранена, любая причина такого поведения.

Вторая проблема:

Ошибка, указанная в заголовке вопроса. Передал client = Client('IP') в качестве аргумента конструктору и использовал объекты self.client.submit в кластере. Но не удалось с сообщением об ошибке

Error: No module name 'diya_info'

Вот код:

main.py

import dask.dataframe as dd
from diya_info import Diya_Info
import time
# from dask import delayed
from dask.distributed import Client

df = dd.read_csv(
    '/Users/asifali/workspace/playground/flask/yellow_tripdata_2015- 01.csv')

# df = delayed(df.fillna(0.3))
# df = df.compute()

client = Client('192.168.0.129:8786')

X = df.drop('payment_type', axis=1).copy()
y = df['payment_type']


Instance = Diya_Info(X, y, client)
s = time.ctime(int(time.time()))
print(s)


Instance = Instance.fit(X, y)


e = time.ctime(int(time.time()))
print(e)
# print((e-s) % 60, ' secs')

diya_info.py

from sklearn.base import TransformerMixin, BaseEstimator
from dask.multiprocessing import get
from dask import delayed, compute


class Diya_Info(BaseEstimator, TransformerMixin):
    def __init__(self, X, y, client):
        assert X is not None, 'X can\'t be None'
        assert type(X).__name__ == 'DataFrame', 'X not of type DataFrame'
        assert y is not None, 'y can\'t be None'
        assert type(y).__name__ == 'Series', 'y not of type Series'

        self.client = client

    def fit(self, X, y):
        self.X = X
        self.y = y
        # X_status = self.has_null(self.X)
        # y_status = self.has_null(self.y)
        # X_len = self.get_len(self.X)
        # y_len = self.get_len(self.y)
        X_status = self.client.submit(self.has_null, self.X)
        y_status = self.client.submit(self.has_null, self.y)
        X_len = self.client.submit(self.get_len, self.X)
        y_len = self.client.submit(self.get_len, self.y)
        # X_null, y_null, X_length, y_length
        X_null, y_null, X_length, y_length = self.client.gather(
        [X_status, y_status, X_len, y_len])

        assert X_null == False, 'X contains some columns with null/NaN values'
        assert y_null == False, 'y contains some columns with null/NaN values'
        assert X_length == y_length, 'Shape mismatch, X and y are of different length'
        return self

    def transform(self, X):
        return X

    @staticmethod
    # @delayed
    def has_null(df):
        return df.isnull().values.any()

    @staticmethod
    # @delayed
    def get_len(df):
        return len(df)

Вот полная трассировка стека:

Sat Aug 11 13:29:08 2018
distributed.utils - ERROR - No module named 'diya_info'
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1315, in _gather
    traceback)
  File "/anaconda3/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'diya_info'
Traceback (most recent call last):
  File "notebook/main.py", line 24, in <module>
    Instance = Instance.fit(X, y)
  File "/Users/asifali/workspace/pythonProjects/ML-engine-DataX/pre-processing/notebook/diya_info.py", line 28, in fit
    X_status, y_status, X_len, y_len)
  File "/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 2170, in compute
    result = self.gather(futures)
  File "/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1437, in gather
    asynchronous=asynchronous)
  File "/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 592, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/anaconda3/lib/python3.6/site-packages/distributed/utils.py", line 254, in sync
    six.reraise(*error[0])
  File "/anaconda3/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/anaconda3/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/anaconda3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.6/site-packages/distributed/client.py", line 1315, in _gather
    traceback)
  File "/anaconda3/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/anaconda3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'diya_info'

Если я раскомментирую @delayed и еще несколько комментариев, он сработает. Но как заставить его работать, передав client в качестве аргумента. Идея состоит в том, чтобы использовать один и тот же клиент для всех библиотек, которые я пытаюсь написать.

ОБНОВЛЕНИЕ 1: Я исправил second problem, удалив декораторы @staticmethod и поместив функции в fit closure. Но что не так с @staticmethod, ведь эти декораторы предназначены для не относящихся к себе вещей, верно?

Вот diya_info.py:

...
def fit(self, X, y):
   self.X = X
   self.y = y

   # function removed from @staticmethod
   def has_null(df): return df.isnull().values.any()
   # function removed from @staticmethod
   def get_len(df): return len(df)

   X_status = self.client.submit(has_null, self.X)
   y_status = self.client.submit(has_null, self.y)
...

Есть ли способ сделать это с помощью @staticmethod. Мне не нравится то, как я решил эту проблему. Все еще ничего не знаю о Problem 1

0
0
545
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
ModuleNotFoundError: No module named 'diya_info'

Это означает, что пока ваш клиент имеет доступ к этому модулю, ваши воркеры - нет. Простой способ решить эту проблему - загрузить ваш скрипт своим воркерам.

client.upload_file('diya_info.py')

Но в целом вы должны убедиться, что ваши сотрудники и клиенты имеют одинаковую программную среду.

Другие вопросы по теме