Я использую fsspec, который использует встроенные возможности paramiko, но не смог найти способ разбить ответ на страницы.
Есть ли способ иметь эту функциональность здесь?
Вариант использования таков, что в каждом каталоге есть 100000 файлов, и я полагаю, что перечислять их все отдельно в памяти - плохая идея.
Существует sftp.listdir_iter, но есть ли такая возможность в fsspec?






listdir_iter обеспечит более прямой способ разбиения на страницы, поскольку он возвращает итератор, позволяющий извлекать элементы один за другим.
import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp
# Use Paramiko's listdir_iter method
items_iter = sftp.listdir_iter(path)
# Skip the items before the starting page
for _ in range(start_page * page_size):
try:
next(items_iter)
except StopIteration:
break
items = []
for i, item in enumerate(items_iter):
if i >= page_size:
break
items.append(item)
return items
Но вы также можете рассмотреть listdir_attr, который загружает все элементы сразу, а затем нарезает список, чтобы получить нужную страницу: это было бы быстрее. Это означает, что вы можете попробовать реализовать разбиение на страницы, нарезав возвращаемый список объектов SFTPAttributes. Например:
import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp
# Use Paramiko's listdir_attr method
items = sftp.listdir_attr(path)
# Slice the items list to implement pagination
start_index = start_page * page_size
end_index = start_index + page_size
paginated_items = items[start_index:end_index]
# Extract filenames from the SFTPAttributes objects
filenames = [item.filename for item in paginated_items]
return filenames
Вы бы использовали его как:
fs = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password")
paginated_response = fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)
Этот подход немного более эффективен, чем тот, который использует listdir_iter, поскольку он позволяет избежать перебора элементов один за другим.
Тем не менее, он по-прежнему загружает все объекты SFTPAttributes в память перед нарезкой списка. Эти накладные расходы памяти могут не быть проблемой, если у вас нет очень большого количества файлов и ограниченных ресурсов памяти.
Чтобы использовать listdir_iter с fsspec , вы можете создать собственный PaginatedSFTPFileSystem класс, который наследуется от SFTPFileSystem.
Пользовательский класс получает доступ к базовому SFTP-клиенту paramiko через атрибут self.ftp, а затем по-прежнему будет напрямую использовать метод listdir_iter.
import paramiko
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
sftp = self.ftp # Access the paramiko SFTP client
# Use Paramiko's listdir_iter method
items_iter = sftp.listdir_iter(path)
# Implement pagination by controlling the iterator
# ... (rest of the implementation)
return items
Получив таким образом доступ к paramiko SFTP-клиенту, вы можете использовать listdir_iter для прямой реализации нумерации страниц, даже если она не является частью fsspec.
Используя sshfs (реализация fsspec для протокола SFTP с использованием asyncssh), я не вижу SSHFS.listdir-подобного метода.
Но sshfs также имеет множество других основных операций с файловой системой, таких как mkdir, touch and find.
Поэтому вы можете попробовать использовать метод find , унаследованный от класса AbstractFileSystem в fsspec , для разбиения на страницы:
import sshfs
class PaginatedSSHFS(sshfs.SSHFS):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def find_paginated(self, path, page_size=1000, start_page=0):
if not await self.isdir(path):
raise ValueError("Path should be a directory")
# Use SSHFS's find method
items = await super().find(path, detail=False)
# Implement pagination by slicing the items list
start_index = start_page * page_size
end_index = start_index + page_size
paginated_items = items[start_index:end_index]
return paginated_items
Вы можете использовать эту пользовательскую реализацию в своем проекте следующим образом:
import asyncio
async def main():
fs = PaginatedSSHFS(host = "your_host", username = "your_username", password = "your_password")
paginated_response = await fs.find_paginated("/path/to/your/directory", page_size=1000, start_page=0)
print(paginated_response)
asyncio.run(main())
В этой реализации используется метод find с параметром детализации, установленным на False, чтобы получить список путей к файлам.
Затем он реализует разбиение на страницы, разрезая список элементов.
Опять же, этот подход загружает все элементы в память перед разрезанием списка, что может быть неэффективным для очень больших каталогов.
Знаете ли вы, есть ли способ, как мы можем добавить sftp-соединение непосредственно к
PaginatedSFTPFileSystemвместо повторного использованияhost,username,password, поскольку у меня уже есть созданноеfsspec.filesystemобъектное sftp-соединение, которое выполняет несколько задач?
Я полагаю, вы можете передать существующий объект SFTPFileSystem в свой пользовательский класс PaginatedSFTPFileSystem и использовать его базовое соединение sftp.
Для этого вы можете изменить пользовательский класс, чтобы он принимал объект SFTPFileSystem во время инициализации и использовал его атрибут sftp для вывода списка элементов каталога.
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, sftp_fs, *args, **kwargs):
self.sftp = sftp_fs.ftp
super().__init__(*args, **kwargs)
def listdir_paginated(self, path, page_size=1000, start_page=0):
if not self.isdir(path):
raise ValueError("Path should be a directory")
# Use the sftp connection from the existing SFTPFileSystem object
items_iter = self.sftp.listdir_iter(path)
# Implement pagination by controlling the iterator
# ... (rest of the implementation)
return items
Теперь вы можете создать объект SFTPFileSystem и передать его PaginatedSFTPFileSystem:
sftp_fs = SFTPFileSystem(host = "your_host", username = "your_username", password = "your_password")
# Pass the existing SFTPFileSystem object to the custom PaginatedSFTPFileSystem
paginated_fs = PaginatedSFTPFileSystem(sftp_fs)
paginated_response = paginated_fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)
print(paginated_response)
Этот пользовательский класс теперь будет использовать sftp-соединение из существующего объекта SFTPFileSystem, устраняя необходимость повторного предоставления host, username и password.
Corralien предлагает в комментариях использовать walk(path, maxdepth=None, topdown=True, **kwargs).
Вы можете использовать этот метод со своим собственным классом PaginatedSFTPFileSystem, поскольку он наследуется от SFTPFileSystem, который, в свою очередь, наследуется от AbstractFileSystem.
Это означает, что метод walk доступен для вашего пользовательского класса.
Однако это может быть не самым подходящим выбором для разбивки на страницы, поскольку он возвращает файлы и каталоги во вложенной структуре, что затрудняет прямое разбиение результатов на страницы.
Если вам нужна нумерация страниц только для каталогов верхнего уровня, вы можете изменить пользовательский класс PaginatedSFTPFileSystem, включив в него пользовательскую реализацию метода walk с поддержкой нумерации страниц для верхнего уровня.
from fsspec.implementations.sftp import SFTPFileSystem
class PaginatedSFTPFileSystem(SFTPFileSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def walk_paginated(self, path, page_size=1000, start_page=0, maxdepth=None, topdown=True, **kwargs):
if not self.isdir(path):
raise ValueError("Path should be a directory")
# Use the original walk method to generate the nested structure
walk_generator = super().walk(path, maxdepth=maxdepth, topdown=topdown, **kwargs)
# Implement pagination for the top level only
start_index = start_page * page_size
end_index = start_index + page_size
paginated_walk = []
for idx, (root, directories, files) in enumerate(walk_generator):
if start_index <= idx < end_index:
paginated_walk.append((root, directories, files))
if idx >= end_index:
break
return paginated_walk
Используется с:
fs = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password")
paginated_response = fs.walk_paginated("/path/to/your/directory", page_size=1000, start_page=0)
for root, directories, files in paginated_response:
print(f"Root: {root}")
print(f"Directories: {directories}")
print(f"Files: {files}\n")
Опять же, это приведет к разбиению на страницы только каталогов и файлов верхнего уровня, а не тех, которые находятся в подкаталогах.
Если вам нужна нумерация страниц для файлов и каталогов на всех уровнях, рассмотрите возможность использования метода find или пользовательского метода listdir_paginated, как показано в предыдущих примерах.
Как отметил mdurant в комментариях:
fsspec кэширует экземпляры файловой системы, поэтому, если вы дважды вызываете SSHFileSystem (или производную) с одними и теми же аргументами, вы НЕ создаете новый сеанс ftp, а повторно используете существующий.
См. Кэширование экземпляра/листинга.
В зависимости от вашего варианта использования вам может потребоваться передать skip_instance_cache=True или use_listings_cache=False.
Учтите, что если вы используете те же аргументы для создания экземпляра PaginatedSFTPFileSystem, fsspec вернет кэшированный экземпляр SFTPFileSystem.
Если вы хотите принудительно создать новый сеанс FTP, вы можете сделать это, передав уникальный аргумент при создании экземпляра PaginatedSFTPFileSystem.
Например, вы можете добавить фиктивный аргумент, который принимает уникальное значение каждый раз, когда вы хотите создать новый сеанс FTP:
# Create a new PaginatedSFTPFileSystem instance with a new FTP session
fs1 = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password", dummy_arg1 = "unique_value_1")
# Create another new PaginatedSFTPFileSystem instance with a new FTP session
fs2 = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password", dummy_arg2 = "unique_value_2")
В этом примере fs1 и fs2 будут иметь отдельные сеансы FTP, несмотря на то, что у них один и тот же хост, имя пользователя и пароль, потому что уникальные фиктивные аргументы заставляют fsspec создавать новые экземпляры вместо повторного использования кэшированного.
``` items_iter = sftp.listdir_iter(path) ``` я вижу, что в fsspec нет объявления для listdir_iter.
Я думаю, что это предлагает подкласс реализации fsspec. Это было бы хорошо, или это могло бы быть апстримом (я бы согласился), но вы также должны заглянуть на github.com/fsspec/sshfs
@LearnerJS Я думал, что, поскольку fsspec использует paramiko под капотом для SFTP, вы можете получить доступ к базовому клиенту paramiko SFTP, чтобы использовать метод listdir_iter.
@VonC Это хороший подход, спасибо. Знаете ли вы, есть ли способ, как мы можем добавить sftp-соединение непосредственно в PaginatedSFTPFileSystem вместо того, чтобы снова иметь хост, имя пользователя и пароль, поскольку у меня уже есть sftp-соединение объекта fsspec.filesystem, которое выполняет несколько задач?
@LearnerJS Я добавил подход SSHFS.
Нет, на самом деле это было бы снова неэффективно, поскольку в дело вступает память, по крайней мере, для моего варианта использования, но спасибо за подробное объяснение.
@LearnerJS Я согласен. Я просто повторил комментарий mdurant
@LearnerJS Я добавил подход для передачи существующего SFTPFileSystem объекта в ваш пользовательский PaginatedSFTPFileSystem.
Принято, вознаграждение будет вознаграждено через 14 часов.
Краткое примечание: fsspec кэширует экземпляры файловой системы, поэтому, если вы дважды вызываете SSHFileSystem (или производную) с одними и теми же аргументами, вы НЕ создаете новый сеанс ftp, а повторно используете существующий.
@mdurant Хороший вопрос, спасибо за отзыв. Я включил ваш комментарий в ответ для большей наглядности.
В качестве альтернативы вы можете использовать прогулку от
fsspec.