Как изящно остановить Kubernetes Watch on Services при выходе из системы

У меня работает следующий демон KOPF:

import kopf
import kubernetes

@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
    config = kubernetes.client.Configuration()
    client = kubernetes.client.ApiClient(config) 
    workload = kubernetes.client.CoreV1Api(client)
    watch = kubernetes.watch.Watch()
    while not stopped:
        for e in watch.stream(workload.list_service_for_all_namespaces):
            svc = e['object']
            lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
            if "NodePort" in svc.spec.type:
                logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
                do_some_work(svc)
    watch.stop()

Когда система завершает работу с помощью Ctrl + C или Kubernetes, убивающего модуль, я получаю следующее предупреждение:

INFO:kopf.reactor.running:Signal SIGINT is received. Operator is stopping.
[2020-12-11 15:07:52,107] kopf.reactor.running [INFO    ] Signal SIGINT is received. Operator is stopping.
WARNING:kopf.objects:Daemon 'worker_services' did not exit in time. Leaving it orphaned.
[2020-12-11 15:07:52,113] kopf.objects         [WARNING ] Daemon 'worker_services' did not exit in time. Leaving it orphaned.

Это позволяет процессу работать в фоновом режиме, даже если я нажимаю Ctrl + Z.

Я считаю, что for loop задерживает процесс с потоком и не завершается при выходе из системы, поэтому он не нажимает watch.stop() в последней строке этого фрагмента.

До сих пор я пробовал следующее:

  • Добавление watch.stop() после do_some_work(svc), но это отправляет мою программу в очень агрессивный цикл, потребляющий до 90% моего процессора.
  • Помещение всего for loop в другой поток привело к сбою некоторых компонентов, таких как регистратор.
  • Реализовано yield e, чтобы сделать процесс неблокирующим, это сделало демон завершенным после первого просмотренного сервиса, и просмотр закончился.
  • Реализованы прослушиватели сигналов с использованием библиотеки signal для прослушивания SIGINT, а затем watch.stop() в функции выхода, но функция так и не была вызвана.
  • Реализовано cancellation_timeout=3.0 т.е. @kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters', cancellation_timeout=3.0) с некоторыми из вышеперечисленных решений, но тоже безуспешно

Любой вклад будет оценен, спасибо заранее.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
1 421
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В вашем примере я вижу, что код пытается следить за ресурсами в кластере. Однако он использует официальную клиентскую библиотеку, которая является синхронной. Синхронные функции (или потоки) не могут быть прерваны в Python, в отличие от асинхронных (которые также требуют использования асинхронного ввода-вывода). Как только функция, показанная здесь, вызывается, она никогда не завершается и не имеет точки, где она проверяет флаг остановки в течение этого длительного выполнения.

Что вы можете сделать с текущим кодом, так это чаще проверять флаг stopped:

@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
    …
    watch = kubernetes.watch.Watch()
    for e in watch.stream(workload.list_service_for_all_namespaces):
        if stopped:  # <<<< check inside of the for-loop
            break
        svc = …
        ………
    watch.stop()

Это проверит, остановлен ли демон при каждом событии каждой службы. Однако он не будет проверять стоп-флаг, если будет полная тишина (такое бывает).

Чтобы обойти это, вы можете ограничить часы по времени (пожалуйста, проверьте документацию клиента о том, как это делается правильно, но iirc, таким образом):

watch = kubernetes.watch.Watch()
for e in watch.stream(workload.list_service_for_all_namespaces, timeout_seconds=123):

Это ограничит время отсутствия ответа/отмены демона максимум до 123 секунд — в случае, если в кластере нет доступных сервисов или они не изменены.

В этом случае вам не нужно проверять условие stopped вне цикла for, так как функция демона завершится с намерением перезапуститься, stopped будет проверена фреймворком и не перезапустит функцию. - как предполагалось.


Кстати, я должен заметить, что наблюдение за ресурсами внутри обработчиков может быть не лучшей идеей. Смотреть сложно. Слишком сложно со всеми пограничными случаями и проблемами, которые это приносит.

И поскольку фреймворк уже следит за этим, может быть проще использовать это и реализовать межресурсное подключение через глобальное состояние оператора:

import queue

import kopf

SERVICE_QUEUES = {}  # {(mc_namespace, mc_name) -> queue.Queue}
KNOWN_SERVICES = {}  # {(svc_namespace, svc_name) -> svc_body}


@kopf.on.event('v1', 'services')
def service_is_seen(type, body, meta, event, **_):

    for q in SERVICE_QUEUES.values():  # right, to all MyClusters known to the moment
        q.put(event)

    if type == 'DELETED' or meta.get('deletionTimestamp'):
        if (namespace, name) in KNOWN_SERVICES:
            del KNOWN_SERVICES[(namespace, name)]
    else:
        KNOWN_SERVICES[(namespace, name)] = body


@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
    # Start getting the updates as soon as possible, to not miss anything while handling the "known" services.
    q = SERVICE_QUEUES[(namespace, name)] = queue.Queue()
    try:

        # Process the Services known before the daemon start/restart.
        for (svc_namespace, svc_name), svc in KNOWN_SERVICES.items():
            if not stopped:
                lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
                if "NodePort" in svc.spec['type']:
                    logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
                    do_some_work(svc)

        # Process the Services arriving after the daemon start/restart.
        while not stopped:
            try:
                svc_event = q.get(timeout=1.0)
            except queue.Empty:
                pass
            else:
                svc = svc_event['object']
                lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
                if "NodePort" in svc.spec['type']:
                    logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
                    do_some_work(svc)

    finally:
        del SERVICE_QUEUES[(namespace, name)]

Это упрощенный пример (но может работать «как есть» — я не проверял) — только для того, чтобы показать идею о том, как заставить ресурсы общаться друг с другом, используя возможности фреймворка.

Решение зависит от варианта использования, и это решение может быть неприменимо в вашем предполагаемом случае. Может быть, я что-то упускаю из-за того, почему это делается именно так. Было бы хорошо, если бы вы сообщили о своем прецеденте в репозиторий Kopf как о запросе функции, чтобы позже он мог быть поддержан фреймворком.

Отличный вклад, спасибо @sergey-vasilyev. Даст событиям ход и обратную связь.

Barend 14.12.2020 07:11

Другие вопросы по теме