Раньше мы использовали requests.adapters.HTTPAdapter для этого ответа с requests==2.32.3 и google-cloud-storage==2.16.0:
from google.cloud import storage
from requests.adapters import HTTPAdapter
gcs_client = storage.Client()
adapter = HTTPAdapter(pool_connections=30, pool_maxsize=30)
gcs_client._http.mount("https://", adapter)
gcs_client._http._auth_request.session.mount("https://", adapter)
Мы переносим нашу кодовую базу на httpx. Этот комментарий к выпуску на GitHub инструктирует использовать специальные транспорты. Я пытался выполнить что-то вроде приведенного ниже с помощью httpx==0.27.0, но это не сработало:
import google.auth
import httpx
from google.cloud import storage
transport = httpx.HTTPTransport(
limits=httpx.Limits(
max_connections=30, max_keepalive_connections=30
)
)
http = httpx.Client(transport=transport)
http.is_mtls = False # Emulating https://github.com/googleapis/google-auth-library-python/blob/v2.29.0/google/auth/transport/requests.py#L400
return Client(
_http=http,
# Emulating https://github.com/googleapis/python-cloud-core/blob/v2.4.1/google/cloud/client/__init__.py#L178
credentials=google.auth.default(scopes=Client.SCOPE)[0],
)
Эта реализация выдает ошибку Unauthorized:
google.api_core.Exceptions.Unauthorized: 401 GET https://storage.googleapis.com/storage/v1/b/foo?projection=noAcl&prettyPrint=false: анонимный вызывающий абонент не имеет доступа к Storage.buckets.get Корзина облачного хранилища Google. Разрешение «storage.buckets.get» для ресурса отклонено (или оно может не существовать).
Как можно перейти из requests.adapters.HTTPAdapter в httpx.HTTPTransport?
Этот вопрос похож на Как реорганизовать HTTP-адаптер запроса для использования с aiohttp?, но для httpx, а не для aiohttp.






К сожалению, на данный момент у меня нет возможности протестировать это решение. Но вы можете попробовать создать «пользовательский» транспорт, который будет обновлять заголовки аутентификации при отправке любого запроса с использованием этого транспорта.
import google.auth
import google.auth.transport.requests
import httpx
from google.cloud import storage
credentials, project = google.auth.default(scopes=storage.Client.SCOPE)
def get_httpx_client(credentials):
class AuthenticatedTransport(httpx.BaseTransport):
def __init__(self, transport):
self.transport = transport
self.auth_request = google.auth.transport.requests.Request()
def handle_request(self, request):
headers = dict(request.headers)
credentials.before_request(
self.auth_request, request.method, request.url, headers
)
request.headers.update(headers)
return self.transport.handle_request(request)
transport = httpx.HTTPTransport(
limits=httpx.Limits(max_connections=30, max_keepalive_connections=30)
)
authenticated_transport = AuthenticatedTransport(transport)
return httpx.Client(transport=authenticated_transport)
custom_client = get_httpx_client(credentials)
gcs_client = storage.Client(_http=custom_client, project=project)
ОБНОВЛЯТЬ
Для потоковой передачи ответов перейдите по ссылке https://www.python-httpx.org/quickstart/#streaming-responses
Что-то вроде:
def fetch_stream_file(url):
with custom_client.stream("GET", url) as response:
response.raise_for_status()
for chunk in response.iter_bytes():
yield chunk
поэтому, если вам нужно обрабатывать потоковые ответы, следуйте python-httpx.org/quickstart/#streaming-responses
Спасибо @Alex, это делает меня намного ближе. К сожалению, это пока не совсем работает, потому что здесь передается
stream=True, аhttpxне поддерживает поток kwarg. Итак, я получаюTypeError: httpx.Client.request() got an unexpected keyword argument 'stream'