Я пытаюсь реализовать рабочий процесс долговечной функции Azure.
Каждые 6 минут у меня функция Azure TimerTrigger вызывает функцию оркестрации Azure (OrchestrationTrigger), которая, в свою очередь, запускает ряд функций активности (ActivityTrigger).
Однако иногда функция Orchestration вызывается дважды в течение нескольких секунд! Это большая проблема, поскольку мои функции активности не идемпотентны!
Ниже показано, как вызывается мой код.
Функция TimerTriggered:
[FunctionName("StartupFunc")]
public static async Task Run([TimerTrigger("0 */6 * * * *", RunOnStartup = true, UseMonitor = false)]TimerInfo myStartTimer, [OrchestrationClient] DurableOrchestrationClient orchestrationClient, TraceWriter log)
{
List<OrchestrationModel> ExportModels = await getData();
string id = await orchestrationClient.StartNewAsync("OrchestratorFunc", ExportModels);
}
Функция оркестровки:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
}
await Task.WhenAll(tasks);
}
Функция деятельности:
[FunctionName("TransformToSql")]
[public static async Task<string> RunTransformation([ActivityTrigger] DurableActivityContext context, TraceWriter log)
{
TransformModel = context.GetInput<TransformModel>();
//Do some work with TransformModel
}
Такое поведение совершенно нормально - именно так работают долговечные функции.
У вас есть следующая оркестровка:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
tasks.Add(context.CallActivityAsync<string>("TransformToSql", new TransformModel(data));
}
await Task.WhenAll(tasks);
}
Когда вызывается действие, поток возвращается к концепции под названием Диспетчер - это внутреннее существо устойчивых функций, ответственное за поддержание потока вашей оркестровки. В ожидании завершения задачи оркестровка временно отключается. После выполнения задачи вся оркестровка воспроизводится до следующего await
.
Важно то, что хотя оркестровка воспроизводится, действие не вызывается еще раз - его результат извлекается из хранилища и используется. Чтобы сделать вещи более явными, рассмотрим следующий пример:
[FunctionName("OrchestratorFunc")]
public static async Task<string> TransformOrchestration([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var dataList = context.GetInput<List<OrchestrationModel>>();
var tasks = new List<Task>();
foreach (var data in dataList)
{
await context.CallActivityAsync<string>("TransformToSql1", new TransformModel(data);
await context.CallActivityAsync<string>("TransformToSql2", new TransformModel(data);
}
}
Когда ожидается TransformToSql1
, оркестровка освобождается, и весь поток ожидает завершения этого действия. Затем оркестровка воспроизводится повторно - она еще раз ожидает TransformToSql1
, но, поскольку ее результат сохраняется, он просто возвращается к оркестровке и ожидает TransformToSql2
- затем процесс повторяется.
@ kamil-mrzyglod - так что, если моей функции оркестрации необходимо выполнить какой-то одноразовый код, который должен выполняться до того, как будут выполнены отдельные запросы активности, а не повторно запущены, что вы порекомендуете сделать? Выполнить это в отдельном отдельном действии, вызванном заранее?
@StephenHolt - если вашей оркестровке требуется какая-то «разминка» перед запуском, она должна получить данные из внешнего источника (например, они должны быть переданы как вход для оркестровки).
Функция оркестрации будет запускаться чаще, поскольку она воспроизводится платформой Durable Function. Вы видите, что ваши функции активности снова срабатывают? Если так, то это действительно странное поведение.
Устойчивые функции используют очереди хранения и таблицы для управления потоком и состоянием записи оркестровки.
Каждый раз, когда запускается функция оркестровки (получая сообщение из очереди управления), она воспроизводит оркестровку. Вы можете проверить это в коде, используя свойство IsReplaying
на DurableOrchestrationContext
.
if (!context.IsReplaying)
{
// this code will only run during the first execution of the workflow
}
Не используйте IsReplaying
для определения запуска действий, а используйте его для ведения журнала / отладки.
Когда функция действия достигнута, сообщение помещается в очередь рабочего элемента. Затем платформа Durable Functions изучит таблицу истории, чтобы определить, выполнялась ли эта функция активности ранее (с заданными параметрами). Если это так, то он не будет запускать функцию активности снова и продолжит выполнение остальной части функции оркестровки.
Функциональность контрольных точек и воспроизведения в Durable Functions работает только тогда, когда код оркестрации детерминирован. Поэтому никогда не используйте решения, основанные на DateTime или случайных числах.
Более подробная информация: https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay
Я вижу, что вторая функция оркестрации (пытается) выполняет точно такое же выполнение действий, что и первая функция оркестрации. Это приводит к множеству исключений и прерыванию работы.
Я знаю о функциях воспроизведения действий, и это работает нормально, без каких-либо проблем. Моя проблема - это два одновременных выполнения функции Orchestrator.