Как использовать пользовательскую файловую систему fsspec с dask?

Я создал свою собственную файловую систему в библиотеке fsspec и пытаюсь прочитать фреймы данных dask из этого объекта файловой системы, чтобы открыть файл фрейма данных. Однако я получаю сообщение об ошибке, когда пытаюсь это сделать. Я предполагаю, что у рабочих нет хорошей копии файловой системы. Вот тестовый код

from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation

class MyLocalFileSystem(AbstractFileSystem):
    def __init__(self,*args,**kwargs):
        super().__init__(*args,**kwargs)
        self.rawfs=LocalFileSystem(*args,**kwargs)

    def __wrap(method_name):
        return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
    
    _open = __wrap("_open")
    info = __wrap("info")
    ls = __wrap("ls")

    del __wrap
    
register_implementation("mfs",MyLocalFileSystem,clobber=True)

import dask.dataframe as dd
import pandas as pd

from tempfile import NamedTemporaryFile
from dask.distributed import Client

with NamedTemporaryFile(mode='wt') as f, Client(): #works if I remove `, Client() (and no other client is running)`
    f.write("A\n0")
    f.flush()
    print("pd, localfs",pd.read_csv(f.name).size)
    print("dd, localfs",dd.read_csv(f.name).size.compute())
    print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
    # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
    print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
    

и я получаю ошибку

2024-06-08 14:48:33,328 - distributed.worker - WARNING - Compute Failed
Key:       ('size-chunk-232fa012bb421939d7011c3af11ac4a7-ea46e61534d2be2ea62e2fe234f0d607', 0)
Function:  execute_task
args:      ((subgraph_callable-92b603c9f28a44f7e623972919b6934a, [(<function read_block_from_file at 0x75336666f420>, <OpenFile '/tmp/tmpob6dxgol'>, 0, 3, b'\n'), None, True, True]))
kwargs:    {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[15], line 33
     31 print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
     32 # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
---> 33 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error

File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
    190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191     with copy.copy(lazy_file) as f:
    192         if off == 0 and bs is None:
    193             return f.read()

File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
    100 def __enter__(self):
    101     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103     f = self.fs.open(self.path, mode=mode)
    105     self.fobjects = [f]
    107     if self.compression is not None:

File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

Cell In[15], line 10, in <lambda>()
      9 def __wrap(method_name):
---> 10     return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)

AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'

Это должно сработать? Как я могу заставить это работать?

Версии: python: 3.12.3 (основной, 23 апреля 2024 г., 09:16:07) [GCC 13.2.1 20240417] даск: 2024.4.1 фсспец: 2024.3.1

Больше попыток

После нескольких предложений от mdurant я попробовал кое-что новое и распечатал еще больше. Он предложил передать rawfs, что я попытался сделать с помощью следующего кода:

from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client

def setup():
    class MyLocalFileSystem(AbstractFileSystem):
        def __init__(self,*args,rawfs=None,**kwargs):
            super().__init__(*args,**kwargs)
            print(rawfs or "no rawfs")
            self.rawfs=rawfs or LocalFileSystem(*args,**kwargs)
    
        def __wrap(method_name):
            def wrapped(self,*args,**kwargs):
                print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)

                return getattr(self.rawfs,method_name)(*args,**kwargs)
            return wrapped
        
        _open = __wrap("_open")
        info = __wrap("info")
        ls = __wrap("ls")
    
        del __wrap
        
    register_implementation("mfs",MyLocalFileSystem,clobber=True)
    print('setup run')
setup()

with NamedTemporaryFile(mode='wt') as f, Client() as client:
    print(client)
    f.write("A\n0")
    f.flush()
    # client.run(setup)
    # print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
    print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
    

И вот я получаю результат

setup run
<Client: 'tcp://127.0.0.1:39165' processes=7 threads=28, memory=31.11 GiB>
<fsspec.implementations.local.LocalFileSystem object at 0x7643560ddb20>
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x779ee00c37a0>
has rawfs? False <__main__.MyLocalFileSystem object at 0x779ee00c3770> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}

2024-06-10 21:08:21,826 - distributed.worker - WARNING - Compute Failed
Key:       ('size-chunk-2f0130c30a88c68f95b5bfe3e88628c5-217eb1639a5f8a0a111f4ea3e7068b71', 0)
Function:  execute_task
args:      ((subgraph_callable-718c799e1dd760f63b6c2cb4a7729284, [(<function read_block_from_file at 0x779ee01bf420>, <OpenFile '/tmp/tmpgb0u08ud'>, 0, 3, b'\n'), None, True, True]))
kwargs:    {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[3], line 37
     34 f.flush()
     35 # client.run(setup)
     36 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
---> 37 print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error

File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
    190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191     with copy.copy(lazy_file) as f:
    192         if off == 0 and bs is None:
    193             return f.read()

File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
    100 def __enter__(self):
    101     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103     f = self.fs.open(self.path, mode=mode)
    105     self.fobjects = [f]
    107     if self.compression is not None:

File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

Cell In[3], line 18, in wrapped()
     16 def wrapped(self,*args,**kwargs):
     17     print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18     return getattr(self.rawfs,method_name)(*args,**kwargs)

AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'

и затем я попробовал

client.run(setup)
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())

вместо

print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute())

и я получаю

setup run
<Client: 'tcp://127.0.0.1:39955' processes=7 threads=28, memory=31.11 GiB>
no rawfs
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
no rawfs
no rawfs
setup run
no rawfs
has rawfs? False <__main__.MyLocalFileSystem object at 0x7381fb4535f0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
setup run
setup run
setup run
setup run
setup run
setup run

2024-06-10 21:09:01,153 - distributed.worker - WARNING - Compute Failed
Key:       ('size-chunk-d64b9d4bac0b21c55395551178057d16-dcfe0b01af30226d1945c64ee5b65773', 0)
Function:  execute_task
args:      ((subgraph_callable-c34921dd377ac1fd9d899f4136f2f0c6, [(<function read_block_from_file at 0x7381fb545d00>, <OpenFile '/tmp/tmpjqcw7153'>, 0, 3, b'\n'), None, True, True]))
kwargs:    {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[4], line 36
     34 f.flush()
     35 client.run(setup)
---> 36 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
     37 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error

File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
    190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191     with copy.copy(lazy_file) as f:
    192         if off == 0 and bs is None:
    193             return f.read()

File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
    100 def __enter__(self):
    101     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103     f = self.fs.open(self.path, mode=mode)
    105     self.fobjects = [f]
    107     if self.compression is not None:

File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

Cell In[4], line 18, in wrapped()
     16 def wrapped(self,*args,**kwargs):
     17     print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18     return getattr(self.rawfs,method_name)(*args,**kwargs)

AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'

Вероятно, я не сделал того, что хотел от меня mdurant, и кажется, что моя файловая система не получает набор свойств после третьего экземпляра. Я делаю это неправильно?

Третья попытка

Хорошо, на основании комментария от mdurant я попробую поместить код в файл и отправить его работникам. Вот код, который я пробовал:

import dask.dataframe as dd, importlib, re
from tempfile import NamedTemporaryFile
from dask.distributed import Client

startup_code = """
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation

class MyLocalFileSystem(AbstractFileSystem):
    def __init__(self,*args,**kwargs):
        super().__init__(*args,**kwargs)
        self.rawfs=LocalFileSystem(*args,**kwargs)
        
    def __wrap(method_name):
        return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)

    _open = __wrap("_open")
    info = __wrap("info")
    ls = __wrap("ls")

    del __wrap
    
register_implementation("mfs",MyLocalFileSystem,clobber=True)
"""

with NamedTemporaryFile(mode='wt',suffix = ".py") as startup_code_file, Client() as client, NamedTemporaryFile(mode='wt') as f:
    startup_code_file.write(startup_code)
    startup_code_file.flush()
    client.upload_file(startup_code_file.name)
    importlib.import_module(re.search(r'/([^/]*)\.py',startup_code_file.name).group(1))
    f.write("A\n0")
    f.flush()
    print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())

и это работает. Я получаю результат dd, myfs 1.

Возможно, вы добьетесь большего успеха, если определите свой собственный класс в файле, отличном от основного сценария, как класс верхнего уровня, а не как динамический внутри функции, и убедитесь, что рабочие имеют доступ к этому файлу (например, client.upload_file).

mdurant 11.06.2024 16:12
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
1
57
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это необычно... Я бы подумал, что ваш метод __init__() действительно вызывается у рабочих при десериализации объекта OpenFile. Однако более типичный подход, чем установка атрибута и надежда, что он появится на удаленном объекте, заключается в включении объекта в качестве аргумента:

    class MyLocalFileSystem(AbstractFileSystem):
        def __init__(self,*args, fs=None, **kwargs):
            super().__init__(*args, **kwargs)
            self.rawfs = fs

Я думаю, что это с большей вероятностью сработает

Кроме того, вы можете рассмотреть возможность вызова register_implementation у рабочих (через client.run()), но я не думаю, что это необходимо для этого рабочего процесса.

В общем, лучше всего хранить определения классов в файлах, а не в динамических файлах; а затем вы сможете распространить эти файлы среди работников либо с помощью общей файловой системы, либо с помощью client.upload_file.

Я пытался реализовать ваши предложения, но мне это не удалось. Смотрите редактирование моего вопроса.

Brian Moths 11.06.2024 04:11

ок, отлично, подход к загрузке файлов работает. Не стесняйтесь включать эту информацию в свой ответ.

Brian Moths 12.06.2024 03:26

Другие вопросы по теме