Я пытаюсь использовать веб-сервис Exchange GetAttachment, используя Запросы, lxml и base64io. Эта служба возвращает файл в кодировке base64 в ответе HTTP SOAP XML. Содержимое файла содержится в одной строке в одном элементе XML. GetAttachment
- это просто пример, но проблема более общая.
Я хотел бы передать содержимое декодированного файла прямо на диск, не сохраняя все содержимое вложения в памяти в любой момент, поскольку вложение может быть несколько 100 МБ.
Я пробовал что-то вроде этого:
r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
if elem.tag == 't:Content':
b64_encoder = Base64IO(BytesIO(elem.text))
f.write(b64_encoder.read())
но lxml
по-прежнему хранит копию вложения как elem.text
. Есть ли способ создать полностью потоковый синтаксический анализатор XML, который также транслирует содержимое элемента непосредственно из входного потока?
В этом случае не используйте iterparse
. Метод iterparse()
может выдавать только события начала и окончания элемента, поэтому любой текст в элементе выдается вам, когда будет найден закрывающий тег XML.
Вместо этого используйте Интерфейс парсера SAX. Это общий стандарт библиотек синтаксического анализа XML для передачи проанализированных данных обработчику содержимого. В ContentHandler.characters()
обратный вызов символьные данные передаются порциями (при условии, что реализующая библиотека XML действительно использует эту возможность). Это API более низкого уровня из API ElementTree, а стандартная библиотека Python уже включает в себя синтаксический анализатор Expat для управления им.
Таким образом, поток становится:
GzipFile
для облегчения распаковки. Или, что еще лучше, установите response.raw.decode_content = True
и оставьте декомпрессию библиотеке запросов на основе кодировки содержимого, установленной сервером.GzipFile
или необработанный поток в .parse()
метод синтаксического анализатора, созданного с помощью xml.sax.make_parser()
. Затем синтаксический анализатор переходит к чтению из потока по частям. Используя make_parser()
, вы сначала можете включить такие функции, как обработка пространства имен (что гарантирует, что ваш код не сломается, если Exchange решит изменить короткие префиксы, используемые для каждого пространства имен).characters()
обработчика содержимого вызывается с фрагментами XML-данных; проверьте правильность события запуска элемента, чтобы знать, когда ожидать данных base64. Вы можете декодировать эти данные base64 в фрагменты (кратные) 4 символа за один раз и записать их в файл. Я бы не стал использовать здесь base64io
, просто делайте свои собственные фрагменты.Простым обработчиком содержимого может быть:
from xml.sax import handler
from base64 import b64decode
class AttachmentContentHandler(handler.ContentHandler):
types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'
def __init__(self, filename):
self.filename = filename
def startDocument(self):
self._buffer = None
self._file = None
def startElementNS(self, name, *args):
if name == (self.types_ns, 'Content'):
# we can expect base64 data next
self._file = open(self.filename, 'wb')
self._buffer = []
def endElementNS(self, name, *args):
if name == (self.types_ns, 'Content'):
# all attachment data received, close the file
try:
if self._buffer:
raise ValueError("Incomplete Base64 data")
finally:
self._file.close()
self._file = self._buffer = None
def characters(self, data):
if self._buffer is None:
return
self._buffer.append(data)
self._decode_buffer()
def _decode_buffer(self):
remainder = ''
for data in self._buffer:
available = len(remainder) + len(data)
overflow = available % 4
if remainder:
data = (remainder + data)
remainder = ''
if overflow:
remainder, data = data[-overflow:], data[:-overflow]
if data:
self._file.write(b64decode(data))
self._buffer = [remainder] if remainder else []
и вы бы использовали это так:
import requests
from xml.sax import make_parser, handler
parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))
r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True # if content-encoding is used, decompress as we read
parser.parse(r.raw)
Это будет анализировать входной XML фрагментами размером до 64 КБ (значение по умолчанию IncrementalParser
размер буфера), поэтому данные вложения декодируются в блоках не более 48 КБ необработанных данных.
Я бы, вероятно, расширил обработчик содержимого, чтобы взять целевой каталог, а затем искать элементы <t:Name>
для извлечения имени файла, а затем использовать это для извлечения данных в правильное имя файла для каждого найденного вложения. Вы также захотите убедиться, что вы действительно имеете дело с документом GetAttachmentResponse
, и обработать ответы об ошибках.
@ErikCederstrand: интерфейс, управляемый событиями SAX, может затруднить чтение кода, поскольку теперь вам нужно поддерживать своего рода конечный автомат, чтобы отслеживать, какие элементы были замечены, и т. д. Если расширение класса для обработки нескольких вложений и т. д... выходит из-под контроля, подумайте о создании отдельных классов для каждого состояния и делегировании атрибуту self.state
, который должен сделать хранить более чистым и понятным.
Спасибо за элегантное решение и указатель на метод SAX characters ()! Мне действительно удалось решить эту проблему с помощью iterparse (), заключив входной поток в пользовательский буферизирующий входной поток, реагируя на событие запуска, а затем перехватив мой входной поток, проверяя буфер, чтобы найти начальный токен, а затем потребляя поток до тех пор, пока Я нахожу конечный токен и, наконец, помещаю поток в начало конечного токена. Полученный код был действительно беспорядочным. Ваше решение намного чище и проще для понимания.