Я использую Python 3.6.6 и Сельдерей 4.2.0.
Я пытаюсь управлять рабочие процессы динамических задач, который может меняться на лету. Рабочие процессы могут содержать длинные и короткие этапы.
Например: Изначально у меня такой рабочий процесс задач:
Но в какой-то момент Мне нужно добавить еще одну задачу, которая зависит от A. Таким образом, задача может дождаться завершения A:
from __future__ import absolute_import
from celery import subtask, signals
from pymemcache.client import base
from test_celery.celery import app
import time
def get_task_uuid(task):
return str(hash(frozenset(task[0], task[1]))))
@app.task
def add(x, y):
print('add({},{}) = {} | {}'.format(x, y, x+y, time.time()))
return x+y
@app.task
def sub(x, y):
print('sub({},{}) = {} | {}'.format(x, y, x-y, time.time()))
return x-y
@app.task
def mul(x, y):
time.sleep(10)
print('mul({},{}) = {} | {}'.format(x,y,x*y, time.time()))
return x*y
@signals.before_task_publish.connect
def before_task_publish(body, exchange, routing_key, headers, properties, retry_policy, **kw):
task = (body, headers['task'])
uuid = get_task_uuid(task)
Я искал любой возможный подход, пытаясь прослушать сигналы задачи, чтобы запустить D, как только задача A завершится успешно (signal.task_success). Любая идея?





вы можете использовать цепи и ссылка на сайт результат 1-й задачи для вызова любой задачи, которую вы хотите, в соответствии с результатом