У меня работает следующий демон KOPF:
import kopf
import kubernetes
@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
config = kubernetes.client.Configuration()
client = kubernetes.client.ApiClient(config)
workload = kubernetes.client.CoreV1Api(client)
watch = kubernetes.watch.Watch()
while not stopped:
for e in watch.stream(workload.list_service_for_all_namespaces):
svc = e['object']
lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
if "NodePort" in svc.spec.type:
logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
do_some_work(svc)
watch.stop()
Когда система завершает работу с помощью Ctrl + C
или Kubernetes, убивающего модуль, я получаю следующее предупреждение:
INFO:kopf.reactor.running:Signal SIGINT is received. Operator is stopping.
[2020-12-11 15:07:52,107] kopf.reactor.running [INFO ] Signal SIGINT is received. Operator is stopping.
WARNING:kopf.objects:Daemon 'worker_services' did not exit in time. Leaving it orphaned.
[2020-12-11 15:07:52,113] kopf.objects [WARNING ] Daemon 'worker_services' did not exit in time. Leaving it orphaned.
Это позволяет процессу работать в фоновом режиме, даже если я нажимаю Ctrl + Z
.
Я считаю, что for loop
задерживает процесс с потоком и не завершается при выходе из системы, поэтому он не нажимает watch.stop()
в последней строке этого фрагмента.
До сих пор я пробовал следующее:
watch.stop()
после do_some_work(svc)
, но это отправляет мою программу в очень агрессивный цикл, потребляющий до 90% моего процессора.for loop
в другой поток привело к сбою некоторых компонентов, таких как регистратор.yield e
, чтобы сделать процесс неблокирующим, это сделало демон завершенным после первого просмотренного сервиса, и просмотр закончился.signal
для прослушивания SIGINT
, а затем watch.stop()
в функции выхода, но функция так и не была вызвана.cancellation_timeout=3.0
т.е. @kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters', cancellation_timeout=3.0)
с некоторыми из вышеперечисленных решений, но тоже безуспешноЛюбой вклад будет оценен, спасибо заранее.
В вашем примере я вижу, что код пытается следить за ресурсами в кластере. Однако он использует официальную клиентскую библиотеку, которая является синхронной. Синхронные функции (или потоки) не могут быть прерваны в Python, в отличие от асинхронных (которые также требуют использования асинхронного ввода-вывода). Как только функция, показанная здесь, вызывается, она никогда не завершается и не имеет точки, где она проверяет флаг остановки в течение этого длительного выполнения.
Что вы можете сделать с текущим кодом, так это чаще проверять флаг stopped
:
@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
…
watch = kubernetes.watch.Watch()
for e in watch.stream(workload.list_service_for_all_namespaces):
if stopped: # <<<< check inside of the for-loop
break
svc = …
………
watch.stop()
Это проверит, остановлен ли демон при каждом событии каждой службы. Однако он не будет проверять стоп-флаг, если будет полная тишина (такое бывает).
Чтобы обойти это, вы можете ограничить часы по времени (пожалуйста, проверьте документацию клиента о том, как это делается правильно, но iirc, таким образом):
watch = kubernetes.watch.Watch()
for e in watch.stream(workload.list_service_for_all_namespaces, timeout_seconds=123):
Это ограничит время отсутствия ответа/отмены демона максимум до 123 секунд — в случае, если в кластере нет доступных сервисов или они не изменены.
В этом случае вам не нужно проверять условие stopped
вне цикла for, так как функция демона завершится с намерением перезапуститься, stopped
будет проверена фреймворком и не перезапустит функцию. - как предполагалось.
Кстати, я должен заметить, что наблюдение за ресурсами внутри обработчиков может быть не лучшей идеей. Смотреть сложно. Слишком сложно со всеми пограничными случаями и проблемами, которые это приносит.
И поскольку фреймворк уже следит за этим, может быть проще использовать это и реализовать межресурсное подключение через глобальное состояние оператора:
import queue
import kopf
SERVICE_QUEUES = {} # {(mc_namespace, mc_name) -> queue.Queue}
KNOWN_SERVICES = {} # {(svc_namespace, svc_name) -> svc_body}
@kopf.on.event('v1', 'services')
def service_is_seen(type, body, meta, event, **_):
for q in SERVICE_QUEUES.values(): # right, to all MyClusters known to the moment
q.put(event)
if type == 'DELETED' or meta.get('deletionTimestamp'):
if (namespace, name) in KNOWN_SERVICES:
del KNOWN_SERVICES[(namespace, name)]
else:
KNOWN_SERVICES[(namespace, name)] = body
@kopf.on.daemon(group='test.example.com', version='v1', plural='myclusters')
def worker_services(namespace, name, spec, status, stopped, logger, **kwargs):
# Start getting the updates as soon as possible, to not miss anything while handling the "known" services.
q = SERVICE_QUEUES[(namespace, name)] = queue.Queue()
try:
# Process the Services known before the daemon start/restart.
for (svc_namespace, svc_name), svc in KNOWN_SERVICES.items():
if not stopped:
lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
if "NodePort" in svc.spec['type']:
logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
do_some_work(svc)
# Process the Services arriving after the daemon start/restart.
while not stopped:
try:
svc_event = q.get(timeout=1.0)
except queue.Empty:
pass
else:
svc = svc_event['object']
lb = helpers.get_service_loadbalancer(name, namespace, svc, logger)
if "NodePort" in svc.spec['type']:
logger.info(f"Found Service of type NodePort: {svc.metadata.name}")
do_some_work(svc)
finally:
del SERVICE_QUEUES[(namespace, name)]
Это упрощенный пример (но может работать «как есть» — я не проверял) — только для того, чтобы показать идею о том, как заставить ресурсы общаться друг с другом, используя возможности фреймворка.
Решение зависит от варианта использования, и это решение может быть неприменимо в вашем предполагаемом случае. Может быть, я что-то упускаю из-за того, почему это делается именно так. Было бы хорошо, если бы вы сообщили о своем прецеденте в репозиторий Kopf как о запросе функции, чтобы позже он мог быть поддержан фреймворком.
Отличный вклад, спасибо @sergey-vasilyev. Даст событиям ход и обратную связь.