В моем анализе данных я часто использую файл xlsx или csv из удаленного места (URL-адрес). Я хочу, чтобы мой код был воспроизводимым и понятным, поэтому лучше всего было бы загрузить файл в моем коде Python, чтобы URL-адрес содержался в моем сценарии, однако при запуске моего сценария он будет загружать файл каждый раз, что занимает слишком много времени. Итак, мой вопрос: существует ли библиотека Python, которая автоматически загружает и кэширует файлы, поэтому я могу использовать URL-адреса в своем коде, например
from remotecaching import r_url
f = open(r_url("https://domain.tld/resource.csv"))
В этом примере r_url загружает файл (если его нет в локальном кеше) и возвращает путь к закэшированному файлу.
У Snakemake есть похожая система (https://snakemake.readthedocs.io/en/stable/snakefiles/remote_files.html), которая, однако, непригодна для использования за пределами экосистемы змейки.






Я написал простую оболочку, которая делает то, что я искал. Он использует каталог XDG Cache для хранения загруженных файлов.
import hashlib
import os
from urllib.parse import urlparse
import zipfile
import tempfile
import subprocess
from pathlib import Path
import pandas as pd
import requests
DATA_DIR = Path(save_cache_path('yourdatadirname'))
if not DATA_DIR.exists():
os.mkdir(DATA_DIR)
def hash(s):
try:
return hashlib.sha256(s).hexdigest()
except TypeError:
return hashlib.sha256(s.encode('utf-8')).hexdigest()
def delete_cached_file(url):
'''
Helper function to delete an erroneous file or something like that..
'''
filename = DATA_DIR / hash(url)
os.remove(filename)
def ssh_file(url):
filename = DATA_DIR / hash(url)
if not filename.exists():
subprocess.run(['scp', url, filename])
return filename
def http_file(url, zip_extract_name=None):
'''
Automatically downloads a URL (if not cached) and provides the file path;
Also tries to automatically unzip files (only works with ZIP files containing a single file with the correct naming..)
:zip_extract_name: extract the specified filename from a ZIP file
'''
filename = DATA_DIR / hash(url)
if not filename.exists():
r = requests.get(url, allow_redirects=True)
url_filename = os.path.basename(urlparse(url).path)
inner_url_filename, ext = os.path.splitext(filename)
if zip_extract_name:
# if ext == '.zip':
zf = tempfile.NamedTemporaryFile(delete=False)
zf.write(r.content)
zf.close()
with zipfile.ZipFile(zf.name, 'r') as zipped_file:
data = zipped_file.read(zip_extract_name)
else:
data = r.content
with open(filename, 'wb') as f:
f.write(data)
return filename