Я создал свою собственную файловую систему в библиотеке fsspec и пытаюсь прочитать фреймы данных dask из этого объекта файловой системы, чтобы открыть файл фрейма данных. Однако я получаю сообщение об ошибке, когда пытаюсь это сделать. Я предполагаю, что у рабочих нет хорошей копии файловой системы. Вот тестовый код
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.rawfs=LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
_open = __wrap("_open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client
with NamedTemporaryFile(mode='wt') as f, Client(): #works if I remove `, Client() (and no other client is running)`
f.write("A\n0")
f.flush()
print("pd, localfs",pd.read_csv(f.name).size)
print("dd, localfs",dd.read_csv(f.name).size.compute())
print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
# print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
и я получаю ошибку
2024-06-08 14:48:33,328 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-232fa012bb421939d7011c3af11ac4a7-ea46e61534d2be2ea62e2fe234f0d607', 0)
Function: execute_task
args: ((subgraph_callable-92b603c9f28a44f7e623972919b6934a, [(<function read_block_from_file at 0x75336666f420>, <OpenFile '/tmp/tmpob6dxgol'>, 0, 3, b'\n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[15], line 33
31 print("pd, myfs",pd.read_csv(f"mfs://{f.name}").size)
32 # print("workaround",dd.from_pandas(pd.read_csv(f"mfs://{f.name}"),npartitions=1).size.compute())
---> 33 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[15], line 10, in <lambda>()
9 def __wrap(method_name):
---> 10 return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
Это должно сработать? Как я могу заставить это работать?
Версии: python: 3.12.3 (основной, 23 апреля 2024 г., 09:16:07) [GCC 13.2.1 20240417] даск: 2024.4.1 фсспец: 2024.3.1
После нескольких предложений от mdurant я попробовал кое-что новое и распечатал еще больше. Он предложил передать rawfs, что я попытался сделать с помощью следующего кода:
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
import dask.dataframe as dd
import pandas as pd
from tempfile import NamedTemporaryFile
from dask.distributed import Client
def setup():
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,rawfs=None,**kwargs):
super().__init__(*args,**kwargs)
print(rawfs or "no rawfs")
self.rawfs=rawfs or LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
def wrapped(self,*args,**kwargs):
print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
return getattr(self.rawfs,method_name)(*args,**kwargs)
return wrapped
_open = __wrap("_open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
print('setup run')
setup()
with NamedTemporaryFile(mode='wt') as f, Client() as client:
print(client)
f.write("A\n0")
f.flush()
# client.run(setup)
# print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
И вот я получаю результат
setup run
<Client: 'tcp://127.0.0.1:39165' processes=7 threads=28, memory=31.11 GiB>
<fsspec.implementations.local.LocalFileSystem object at 0x7643560ddb20>
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> info ('/tmp/tmpgb0u08ud',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x764354683ad0> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x76435451bb90>
<fsspec.implementations.local.LocalFileSystem object at 0x779ee00c37a0>
has rawfs? False <__main__.MyLocalFileSystem object at 0x779ee00c3770> _open ('/tmp/tmpgb0u08ud',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
2024-06-10 21:08:21,826 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-2f0130c30a88c68f95b5bfe3e88628c5-217eb1639a5f8a0a111f4ea3e7068b71', 0)
Function: execute_task
args: ((subgraph_callable-718c799e1dd760f63b6c2cb4a7729284, [(<function read_block_from_file at 0x779ee01bf420>, <OpenFile '/tmp/tmpgb0u08ud'>, 0, 3, b'\n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[3], line 37
34 f.flush()
35 # client.run(setup)
36 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
---> 37 print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[3], line 18, in wrapped()
16 def wrapped(self,*args,**kwargs):
17 print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18 return getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
и затем я попробовал
client.run(setup)
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())
вместо
print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute())
и я получаю
setup run
<Client: 'tcp://127.0.0.1:39955' processes=7 threads=28, memory=31.11 GiB>
no rawfs
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> info ('/tmp/tmpjqcw7153',) {}
has rawfs? True <__main__.setup.<locals>.MyLocalFileSystem object at 0x76435449a3c0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
no rawfs
no rawfs
setup run
no rawfs
has rawfs? False <__main__.MyLocalFileSystem object at 0x7381fb4535f0> _open ('/tmp/tmpjqcw7153',) {'mode': 'rb', 'block_size': None, 'autocommit': True, 'cache_options': None}
setup run
setup run
setup run
setup run
setup run
setup run
2024-06-10 21:09:01,153 - distributed.worker - WARNING - Compute Failed
Key: ('size-chunk-d64b9d4bac0b21c55395551178057d16-dcfe0b01af30226d1945c64ee5b65773', 0)
Function: execute_task
args: ((subgraph_callable-c34921dd377ac1fd9d899f4136f2f0c6, [(<function read_block_from_file at 0x7381fb545d00>, <OpenFile '/tmp/tmpjqcw7153'>, 0, 3, b'\n'), None, True, True]))
kwargs: {}
Exception: 'AttributeError("\'MyLocalFileSystem\' object has no attribute \'rawfs\'")'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[4], line 36
34 f.flush()
35 client.run(setup)
---> 36 print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute()) #error
37 # print("dd, myfs",dd.read_csv(f"mfs://{f.name}",storage_options=dict(rawfs=LocalFileSystem())).size.compute()) #error
File /usr/lib/python3.12/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File /usr/lib/python3.12/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/lib/python3.12/site-packages/dask/bytes/core.py:191, in read_block_from_file()
190 def read_block_from_file(lazy_file, off, bs, delimiter):
--> 191 with copy.copy(lazy_file) as f:
192 if off == 0 and bs is None:
193 return f.read()
File /usr/lib/python3.12/site-packages/fsspec/core.py:103, in __enter__()
100 def __enter__(self):
101 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 103 f = self.fs.open(self.path, mode=mode)
105 self.fobjects = [f]
107 if self.compression is not None:
File /usr/lib/python3.12/site-packages/fsspec/spec.py:1293, in open()
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
Cell In[4], line 18, in wrapped()
16 def wrapped(self,*args,**kwargs):
17 print("has rawfs?",hasattr(self,'rawfs'),self,method_name,args,kwargs)
---> 18 return getattr(self.rawfs,method_name)(*args,**kwargs)
AttributeError: 'MyLocalFileSystem' object has no attribute 'rawfs'
Вероятно, я не сделал того, что хотел от меня mdurant, и кажется, что моя файловая система не получает набор свойств после третьего экземпляра. Я делаю это неправильно?
Хорошо, на основании комментария от mdurant я попробую поместить код в файл и отправить его работникам. Вот код, который я пробовал:
import dask.dataframe as dd, importlib, re
from tempfile import NamedTemporaryFile
from dask.distributed import Client
startup_code = """
from fsspec.implementations.local import LocalFileSystem
from fsspec import AbstractFileSystem, register_implementation
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
self.rawfs=LocalFileSystem(*args,**kwargs)
def __wrap(method_name):
return lambda self,*args,**kwargs: getattr(self.rawfs,method_name)(*args,**kwargs)
_open = __wrap("_open")
info = __wrap("info")
ls = __wrap("ls")
del __wrap
register_implementation("mfs",MyLocalFileSystem,clobber=True)
"""
with NamedTemporaryFile(mode='wt',suffix = ".py") as startup_code_file, Client() as client, NamedTemporaryFile(mode='wt') as f:
startup_code_file.write(startup_code)
startup_code_file.flush()
client.upload_file(startup_code_file.name)
importlib.import_module(re.search(r'/([^/]*)\.py',startup_code_file.name).group(1))
f.write("A\n0")
f.flush()
print("dd, myfs",dd.read_csv(f"mfs://{f.name}").size.compute())
и это работает. Я получаю результат dd, myfs 1.





Это необычно... Я бы подумал, что ваш метод __init__() действительно вызывается у рабочих при десериализации объекта OpenFile.
Однако более типичный подход, чем установка атрибута и надежда, что он появится на удаленном объекте, заключается в включении объекта в качестве аргумента:
class MyLocalFileSystem(AbstractFileSystem):
def __init__(self,*args, fs=None, **kwargs):
super().__init__(*args, **kwargs)
self.rawfs = fs
Я думаю, что это с большей вероятностью сработает
Кроме того, вы можете рассмотреть возможность вызова register_implementation у рабочих (через client.run()), но я не думаю, что это необходимо для этого рабочего процесса.
В общем, лучше всего хранить определения классов в файлах, а не в динамических файлах; а затем вы сможете распространить эти файлы среди работников либо с помощью общей файловой системы, либо с помощью client.upload_file.
Я пытался реализовать ваши предложения, но мне это не удалось. Смотрите редактирование моего вопроса.
ок, отлично, подход к загрузке файлов работает. Не стесняйтесь включать эту информацию в свой ответ.
Возможно, вы добьетесь большего успеха, если определите свой собственный класс в файле, отличном от основного сценария, как класс верхнего уровня, а не как динамический внутри функции, и убедитесь, что рабочие имеют доступ к этому файлу (например, client.upload_file).