Соединение разорвано: IncompleteRead в Google Cloud Run

У меня есть служба запуска облака Google, которая запускается через pubsub, собирает данные из API Azure и помещает их в корзину gcp с помощью smart_open.

          with smart_open.open(destination, "wb") as fout:
            schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
            data_frame.to_parquet(
                path=fout,
                engine = "pyarrow",
                compression = "gzip",
                schema=schema,
                use_compliant_nested_type=False,
            )

Редактировать: Это ошибка, которую я получаю в google-resumable-media-python

https://github.com/googleapis/google-resumable-media-python/blob/main/google/resumable_media/_upload.py#L50-L53

Это сохраняет сотни тысяч файлов за несколько часов, каждый день происходит несколько сбоев с этой ошибкой, которую я не могу понять, как исправить.

Получение метрик Azure для идентификатора ресурса: xxx Конфигурация: ххх Ошибка получения метрик: соединение разорвано: IncompleteRead(прочитано 15773 байт, ожидается еще 125469)', IncompleteRead(прочитано 15773 байт, ожидается еще 125469))

А также иногда очень похожая ошибка, подобная этой

Получение метрик Azure для идентификатора ресурса: xxx Конфигурация: ххх Ошибка получения метрик: поток байтов находится в неожиданном состоянии. Из локального потока было прочитано 8887 байт, а 0 байт уже были обновлены (они должны совпадать).

Метрики Cloudrun выглядят хорошо, и я понятия не имею, как их отлаживать дальше.

Шаги по повторению.

  1. Настройте службу Cloudrun, которая запускает приложение Flask с одной конечной точкой, которая получает события от триггера eventarc.

    @app.route("/", methods=["POST"])
    def index():
      data = request.get_json()
      if not data:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    
      if not isinstance(data, dict) or "message" not in data:
        msg = "invalid Pub/Sub message formaty"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400
    
      pubsub_message = data["message"]
    
  2. Используйте Gunicorn, чтобы запустить это, Dockerfile

    ...
    CMD ./azu_metrics_extractor_pex.pex \
        azu_metrics_extractor.main:app \
        -b :${PORT} \
        -w 3 \
        --threads=8 \
        --timeout=0
    
  3. Служба должна использовать информацию в сообщении pubsub для получения данных ресурсов из Azure, преобразования в паркет и записи их в корзину gcp с помощью smart_open.

          with smart_open.open(destination, "wb") as fout:
            schema = pyarrow_adapters.json_schema_to_pyarrow_schema(self.desired_schema)
            data_frame.to_parquet(
                path=fout,
                engine = "pyarrow",
                compression = "gzip",
                schema=schema,
                use_compliant_nested_type=False,
            )
    
  4. Запустите тему pubsub с ~10 тыс. сообщений за час

  5. Обратите внимание, что примерно 2 или 3 файла паркета, которые должны были быть сохранены в gcp, не удалось сохранить с указанными выше ошибками.

Потенциально связано с этой открытой проблемой для smart_open. https://github.com/piskvorky/smart_open/issues/784

Пробовали обновить gcloud ? Ваш сервер успешно работает на вашем локальном компьютере? Пробовали ли вы обновить используемые библиотеки/зависимости? А можете поделиться логами из google cloud?

Roopa M 03.06.2024 14:11

Сервер успешно работает на моем локальном компьютере, хотя сложно эмулировать скорость запуска, происходящую на gcp, из-за взаимодействия публикации и подписки. Использование последней версии 7.0.4 smart_open. Никаких других журналов об ошибке, кроме того, что я опубликовал, нет, только другие выходные данные - это информационные журналы, которые у меня есть, но я обновил сообщение с ними на случай, если они помогут.

OneTwo 04.06.2024 19:21

В таком случае, можете ли вы поделиться шагами, которые вы выполнили для воспроизведения? Также расскажите подробнее о своем подходе/архитектуре?

Roopa M 05.06.2024 08:18

Добавлено больше шагов для воспроизведения.

OneTwo 05.06.2024 12:02
Создание приборной панели для анализа данных на GCP - часть I
Создание приборной панели для анализа данных на GCP - часть I
Недавно я столкнулся с интересной бизнес-задачей - визуализацией сбоев в цепочке поставок лекарств, которую могут просматривать врачи и...
3
4
189
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

У нас была ошибка «соединение разорвано» в рабочих процессах GCP, вызывающих облачную функцию. По вашему описанию 2 - 3 ошибки на 10к. Мы проверили это с помощью поддержки GCP, и их решением было создание функции повтора. Они признали, что это связано с сетью (а может быть, и до сих пор).

Поэтому мы придумали специальную повторную попытку в рабочих процессах GCP и больше никогда не видели этой ошибки.

    retry:
      predicate: $${custom_retry}
      max_retries: 5
      backoff:
        initial_delay: 5
        max_delay: 60
        multiplier: 2

....

custom_retry:
  params: [e]
  steps:
    - retry_always:
        return: true

повторная попытка всегда звучит глупо, но в нашем случае работает.

Таким образом, в коде запуска облака вам нужно будет обнаружить ошибку и повторить попытку. Вам нужна помощь с этим?

Спасибо, это на самом деле то, что я сейчас пытался. Я обновил декоратор повтора следующим образом @retry.Retry(predicate=retry.retry_base.if_transient_error or retry.if_exception_type(requests.exceptions.ChunkedEncodingE‌​rror) or (lambda e: isinstance(e, ValueError) and "Bytes stream is in unexpected state" in str(e))) Но в настоящее время он не работает (получит ту же ошибку без попытки повтора), рассмотрю ваше предложение повторной попытки здесь и посмотрю, поможет ли это.

OneTwo 24.06.2024 17:20

Спасибо, повторная попытка сработала, завтра обновлю это своим решением.

OneTwo 26.06.2024 21:05

Другие вопросы по теме

Похожие вопросы

Получить URL-адрес, по которому я могу использовать Curl
Каков лимит параллелизма для запросов в BigQuery?
Как злоумышленник сможет получить доступ к моей базе данных Cloud Firestore, если я не настроил никаких правил безопасности Cloud Firestore?
Создание Google Meet Space с использованием API Meet и учетной записи службы. Ошибка: 7 PERMISSION_DENIED: разрешение запрещено для ресурса Space (или оно может не существовать)
Опирается ли вход в несколько кластеров GCP на службу Cloud Service Mesh (Traffic Director)?
Сообщение о публикации/подписке рабочего процесса GCP
Получение несоответствия стоимости при использовании UNNEST в BigQuery
Анализ полезной нагрузки protobuf с помощью уведомлений Firestore Eventarc и Google Cloud Run + Kotlin/Ktor
В консоли Google Cloud безопасно ли предоставить роль Firebase Admin SDK учетной записи вычислительной службы по умолчанию?
Создайте график информационной панели с точками данных в журналах