Fsspec - есть ли способ получить ответ с разбивкой на страницы от sftp?

Я использую fsspec, который использует встроенные возможности paramiko, но не смог найти способ разбить ответ на страницы.

Есть ли способ иметь эту функциональность здесь?

Вариант использования таков, что в каждом каталоге есть 100000 файлов, и я полагаю, что перечислять их все отдельно в памяти - плохая идея.

Существует sftp.listdir_iter, но есть ли такая возможность в fsspec?

В качестве альтернативы вы можете использовать прогулку от fsspec.

Corralien 14.05.2023 20:43
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
1
97
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

listdir_iter обеспечит более прямой способ разбиения на страницы, поскольку он возвращает итератор, позволяющий извлекать элементы один за другим.

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp

        # Use Paramiko's listdir_iter method
        items_iter = sftp.listdir_iter(path)

        # Skip the items before the starting page
        for _ in range(start_page * page_size):
            try:
                next(items_iter)
            except StopIteration:
                break

        items = []
        for i, item in enumerate(items_iter):
            if i >= page_size:
                break
            items.append(item)

        return items

Но вы также можете рассмотреть listdir_attr, который загружает все элементы сразу, а затем нарезает список, чтобы получить нужную страницу: это было бы быстрее. Это означает, что вы можете попробовать реализовать разбиение на страницы, нарезав возвращаемый список объектов SFTPAttributes. Например:

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp

        # Use Paramiko's listdir_attr method
        items = sftp.listdir_attr(path)

        # Slice the items list to implement pagination
        start_index = start_page * page_size
        end_index = start_index + page_size
        paginated_items = items[start_index:end_index]

        # Extract filenames from the SFTPAttributes objects
        filenames = [item.filename for item in paginated_items]

        return filenames

Вы бы использовали его как:

fs = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password")
paginated_response = fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)

Этот подход немного более эффективен, чем тот, который использует listdir_iter, поскольку он позволяет избежать перебора элементов один за другим.
Тем не менее, он по-прежнему загружает все объекты SFTPAttributes в память перед нарезкой списка. Эти накладные расходы памяти могут не быть проблемой, если у вас нет очень большого количества файлов и ограниченных ресурсов памяти.


Чтобы использовать listdir_iter с fsspec , вы можете создать собственный PaginatedSFTPFileSystem класс, который наследуется от SFTPFileSystem.
Пользовательский класс получает доступ к базовому SFTP-клиенту paramiko через атрибут self.ftp, а затем по-прежнему будет напрямую использовать метод listdir_iter.

import paramiko
from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        sftp = self.ftp  # Access the paramiko SFTP client

        # Use Paramiko's listdir_iter method
        items_iter = sftp.listdir_iter(path)

        # Implement pagination by controlling the iterator
        # ... (rest of the implementation)

        return items

Получив таким образом доступ к paramiko SFTP-клиенту, вы можете использовать listdir_iter для прямой реализации нумерации страниц, даже если она не является частью fsspec.


Используя sshfs (реализация fsspec для протокола SFTP с использованием asyncssh), я не вижу SSHFS.listdir-подобного метода.

Но sshfs также имеет множество других основных операций с файловой системой, таких как mkdir, touch and find.

Поэтому вы можете попробовать использовать метод find , унаследованный от класса AbstractFileSystem в fsspec , для разбиения на страницы:

import sshfs

class PaginatedSSHFS(sshfs.SSHFS):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def find_paginated(self, path, page_size=1000, start_page=0):
        if not await self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use SSHFS's find method
        items = await super().find(path, detail=False)

        # Implement pagination by slicing the items list
        start_index = start_page * page_size
        end_index = start_index + page_size
        paginated_items = items[start_index:end_index]

        return paginated_items

Вы можете использовать эту пользовательскую реализацию в своем проекте следующим образом:

import asyncio

async def main():
    fs = PaginatedSSHFS(host = "your_host", username = "your_username", password = "your_password")
    paginated_response = await fs.find_paginated("/path/to/your/directory", page_size=1000, start_page=0)
    print(paginated_response)

asyncio.run(main())

В этой реализации используется метод find с параметром детализации, установленным на False, чтобы получить список путей к файлам.
Затем он реализует разбиение на страницы, разрезая список элементов.

Опять же, этот подход загружает все элементы в память перед разрезанием списка, что может быть неэффективным для очень больших каталогов.


Знаете ли вы, есть ли способ, как мы можем добавить sftp-соединение непосредственно к PaginatedSFTPFileSystem вместо повторного использования host, username, password, поскольку у меня уже есть созданное fsspec.filesystem объектное sftp-соединение, которое выполняет несколько задач?

Я полагаю, вы можете передать существующий объект SFTPFileSystem в свой пользовательский класс PaginatedSFTPFileSystem и использовать его базовое соединение sftp.

Для этого вы можете изменить пользовательский класс, чтобы он принимал объект SFTPFileSystem во время инициализации и использовал его атрибут sftp для вывода списка элементов каталога.

from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, sftp_fs, *args, **kwargs):
        self.sftp = sftp_fs.ftp
        super().__init__(*args, **kwargs)

    def listdir_paginated(self, path, page_size=1000, start_page=0):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use the sftp connection from the existing SFTPFileSystem object
        items_iter = self.sftp.listdir_iter(path)

        # Implement pagination by controlling the iterator
        # ... (rest of the implementation)

        return items

Теперь вы можете создать объект SFTPFileSystem и передать его PaginatedSFTPFileSystem:

sftp_fs = SFTPFileSystem(host = "your_host", username = "your_username", password = "your_password")

# Pass the existing SFTPFileSystem object to the custom PaginatedSFTPFileSystem
paginated_fs = PaginatedSFTPFileSystem(sftp_fs)

paginated_response = paginated_fs.listdir_paginated("/path/to/your/directory", page_size=1000, start_page=0)
print(paginated_response)

Этот пользовательский класс теперь будет использовать sftp-соединение из существующего объекта SFTPFileSystem, устраняя необходимость повторного предоставления host, username и password.


Corralien предлагает в комментариях использовать walk(path, maxdepth=None, topdown=True, **kwargs).

Вы можете использовать этот метод со своим собственным классом PaginatedSFTPFileSystem, поскольку он наследуется от SFTPFileSystem, который, в свою очередь, наследуется от AbstractFileSystem.
Это означает, что метод walk доступен для вашего пользовательского класса.

Однако это может быть не самым подходящим выбором для разбивки на страницы, поскольку он возвращает файлы и каталоги во вложенной структуре, что затрудняет прямое разбиение результатов на страницы.

Если вам нужна нумерация страниц только для каталогов верхнего уровня, вы можете изменить пользовательский класс PaginatedSFTPFileSystem, включив в него пользовательскую реализацию метода walk с поддержкой нумерации страниц для верхнего уровня.

from fsspec.implementations.sftp import SFTPFileSystem

class PaginatedSFTPFileSystem(SFTPFileSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def walk_paginated(self, path, page_size=1000, start_page=0, maxdepth=None, topdown=True, **kwargs):
        if not self.isdir(path):
            raise ValueError("Path should be a directory")

        # Use the original walk method to generate the nested structure
        walk_generator = super().walk(path, maxdepth=maxdepth, topdown=topdown, **kwargs)

        # Implement pagination for the top level only
        start_index = start_page * page_size
        end_index = start_index + page_size

        paginated_walk = []
        for idx, (root, directories, files) in enumerate(walk_generator):
            if start_index <= idx < end_index:
                paginated_walk.append((root, directories, files))
            if idx >= end_index:
                break

        return paginated_walk

Используется с:

fs = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password")

paginated_response = fs.walk_paginated("/path/to/your/directory", page_size=1000, start_page=0)
for root, directories, files in paginated_response:
    print(f"Root: {root}")
    print(f"Directories: {directories}")
    print(f"Files: {files}\n")

Опять же, это приведет к разбиению на страницы только каталогов и файлов верхнего уровня, а не тех, которые находятся в подкаталогах. Если вам нужна нумерация страниц для файлов и каталогов на всех уровнях, рассмотрите возможность использования метода find или пользовательского метода listdir_paginated, как показано в предыдущих примерах.


Как отметил mdurant в комментариях:

fsspec кэширует экземпляры файловой системы, поэтому, если вы дважды вызываете SSHFileSystem (или производную) с одними и теми же аргументами, вы НЕ создаете новый сеанс ftp, а повторно используете существующий.

См. Кэширование экземпляра/листинга.

В зависимости от вашего варианта использования вам может потребоваться передать skip_instance_cache=True или use_listings_cache=False.

Учтите, что если вы используете те же аргументы для создания экземпляра PaginatedSFTPFileSystem, fsspec вернет кэшированный экземпляр SFTPFileSystem. Если вы хотите принудительно создать новый сеанс FTP, вы можете сделать это, передав уникальный аргумент при создании экземпляра PaginatedSFTPFileSystem.

Например, вы можете добавить фиктивный аргумент, который принимает уникальное значение каждый раз, когда вы хотите создать новый сеанс FTP:

# Create a new PaginatedSFTPFileSystem instance with a new FTP session
fs1 = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password", dummy_arg1 = "unique_value_1")

# Create another new PaginatedSFTPFileSystem instance with a new FTP session
fs2 = PaginatedSFTPFileSystem(host = "your_host", username = "your_username", password = "your_password", dummy_arg2 = "unique_value_2")

В этом примере fs1 и fs2 будут иметь отдельные сеансы FTP, несмотря на то, что у них один и тот же хост, имя пользователя и пароль, потому что уникальные фиктивные аргументы заставляют fsspec создавать новые экземпляры вместо повторного использования кэшированного.

``` items_iter = sftp.listdir_iter(path) ``` я вижу, что в fsspec нет объявления для listdir_iter.

LearnerJS 14.05.2023 19:20

Я думаю, что это предлагает подкласс реализации fsspec. Это было бы хорошо, или это могло бы быть апстримом (я бы согласился), но вы также должны заглянуть на github.com/fsspec/sshfs

mdurant 14.05.2023 19:25

@LearnerJS Я думал, что, поскольку fsspec использует paramiko под капотом для SFTP, вы можете получить доступ к базовому клиенту paramiko SFTP, чтобы использовать метод listdir_iter.

VonC 14.05.2023 19:40

@VonC Это хороший подход, спасибо. Знаете ли вы, есть ли способ, как мы можем добавить sftp-соединение непосредственно в PaginatedSFTPFileSystem вместо того, чтобы снова иметь хост, имя пользователя и пароль, поскольку у меня уже есть sftp-соединение объекта fsspec.filesystem, которое выполняет несколько задач?

LearnerJS 14.05.2023 19:51

@LearnerJS Я добавил подход SSHFS.

VonC 14.05.2023 19:55

Нет, на самом деле это было бы снова неэффективно, поскольку в дело вступает память, по крайней мере, для моего варианта использования, но спасибо за подробное объяснение.

LearnerJS 14.05.2023 19:56

@LearnerJS Я согласен. Я просто повторил комментарий mdurant

VonC 14.05.2023 20:00

@LearnerJS Я добавил подход для передачи существующего SFTPFileSystem объекта в ваш пользовательский PaginatedSFTPFileSystem.

VonC 14.05.2023 20:01

Принято, вознаграждение будет вознаграждено через 14 часов.

LearnerJS 14.05.2023 20:34

Краткое примечание: fsspec кэширует экземпляры файловой системы, поэтому, если вы дважды вызываете SSHFileSystem (или производную) с одними и теми же аргументами, вы НЕ создаете новый сеанс ftp, а повторно используете существующий.

mdurant 15.05.2023 04:15

@mdurant Хороший вопрос, спасибо за отзыв. Я включил ваш комментарий в ответ для большей наглядности.

VonC 15.05.2023 09:54

Другие вопросы по теме