У меня есть служба запуска облака Google, которая запускается через pubsub, собирает данные из API Azure и помещает их в корзину gcp с помощью smart_open.
with smart_open.open(destination, "wb") as fout:
schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
data_frame.to_parquet(
path=fout,
engine = "pyarrow",
compression = "gzip",
schema=schema,
use_compliant_nested_type=False,
)
Редактировать: Это ошибка, которую я получаю в google-resumable-media-python
Это сохраняет сотни тысяч файлов за несколько часов, каждый день происходит несколько сбоев с этой ошибкой, которую я не могу понять, как исправить.
Получение метрик Azure для идентификатора ресурса: xxx Конфигурация: ххх Ошибка получения метрик: соединение разорвано: IncompleteRead(прочитано 15773 байт, ожидается еще 125469)', IncompleteRead(прочитано 15773 байт, ожидается еще 125469))
А также иногда очень похожая ошибка, подобная этой
Получение метрик Azure для идентификатора ресурса: xxx Конфигурация: ххх Ошибка получения метрик: поток байтов находится в неожиданном состоянии. Из локального потока было прочитано 8887 байт, а 0 байт уже были обновлены (они должны совпадать).
Метрики Cloudrun выглядят хорошо, и я понятия не имею, как их отлаживать дальше.
Шаги по повторению.
Настройте службу Cloudrun, которая запускает приложение Flask с одной конечной точкой, которая получает события от триггера eventarc.
@app.route("/", methods=["POST"])
def index():
data = request.get_json()
if not data:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(data, dict) or "message" not in data:
msg = "invalid Pub/Sub message formaty"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = data["message"]
Используйте Gunicorn, чтобы запустить это, Dockerfile
...
CMD ./azu_metrics_extractor_pex.pex \
azu_metrics_extractor.main:app \
-b :${PORT} \
-w 3 \
--threads=8 \
--timeout=0
Служба должна использовать информацию в сообщении pubsub для получения данных ресурсов из Azure, преобразования в паркет и записи их в корзину gcp с помощью smart_open.
with smart_open.open(destination, "wb") as fout:
schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
data_frame.to_parquet(
path=fout,
engine = "pyarrow",
compression = "gzip",
schema=schema,
use_compliant_nested_type=False,
)
Запустите тему pubsub с ~10 тыс. сообщений за час
Обратите внимание, что примерно 2 или 3 файла паркета, которые должны были быть сохранены в gcp, не удалось сохранить с указанными выше ошибками.
Потенциально связано с этой открытой проблемой для smart_open. https://github.com/piskvorky/smart_open/issues/784
Сервер успешно работает на моем локальном компьютере, хотя сложно эмулировать скорость запуска, происходящую на gcp, из-за взаимодействия публикации и подписки. Использование последней версии 7.0.4 smart_open. Никаких других журналов об ошибке, кроме того, что я опубликовал, нет, только другие выходные данные - это информационные журналы, которые у меня есть, но я обновил сообщение с ними на случай, если они помогут.
В таком случае, можете ли вы поделиться шагами, которые вы выполнили для воспроизведения? Также расскажите подробнее о своем подходе/архитектуре?
Добавлено больше шагов для воспроизведения.

У нас была ошибка «соединение разорвано» в рабочих процессах GCP, вызывающих облачную функцию. По вашему описанию 2 - 3 ошибки на 10к. Мы проверили это с помощью поддержки GCP, и их решением было создание функции повтора. Они признали, что это связано с сетью (а может быть, и до сих пор).
Поэтому мы придумали специальную повторную попытку в рабочих процессах GCP и больше никогда не видели этой ошибки.
retry:
predicate: $${custom_retry}
max_retries: 5
backoff:
initial_delay: 5
max_delay: 60
multiplier: 2
....
custom_retry:
params: [e]
steps:
- retry_always:
return: true
повторная попытка всегда звучит глупо, но в нашем случае работает.
Таким образом, в коде запуска облака вам нужно будет обнаружить ошибку и повторить попытку. Вам нужна помощь с этим?
Спасибо, это на самом деле то, что я сейчас пытался. Я обновил декоратор повтора следующим образом @retry.Retry(predicate=retry.retry_base.if_transient_error or retry.if_exception_type(requests.exceptions.ChunkedEncodingError) or (lambda e: isinstance(e, ValueError) and "Bytes stream is in unexpected state" in str(e))) Но в настоящее время он не работает (получит ту же ошибку без попытки повтора), рассмотрю ваше предложение повторной попытки здесь и посмотрю, поможет ли это.
Спасибо, повторная попытка сработала, завтра обновлю это своим решением.
Пробовали обновить
gcloud? Ваш сервер успешно работает на вашем локальном компьютере? Пробовали ли вы обновить используемые библиотеки/зависимости? А можете поделиться логами изgoogle cloud?