В нашем рабочем процессе cadence нам часто приходится ждать определенное время для внешних событий, прежде чем продолжить (например, чтение электронной почты, переход по ссылке и т. д.).
Мне было интересно, как лучше всего уведомить наши рабочие процессы об этих событиях. Сигналы — это правильно, или мы должны создать активность, которая будет ждать события?
Из того, что я видел, нам нужно создать сигнальный канал ch := workflow.GetSignalChannel(ctx, SignalName)
, однако контекст недоступен в действиях.
Сигнализация — это рекомендуемый способ отправки событий в рабочие процессы.
Часто используемый шаблон для рабочих процессов Go заключается в использовании Selector для ожидания нескольких каналов сигнала, а также будущего таймера.
Перейти пример:
sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)
s := workflow.NewSelector(ctx)
var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
c.Receive(ctx, signal2)
})
s.Select(ctx)
if signal1 == nil && signal2 == nil {
// handle timeout
} else {
// process signals
}
Пример Java:
public interface MyWorkflow {
@WorkflowMethod
void main();
@SignalMethod
void signal1(Signal1Struct signal);
@SignalMethod
void signal2(Signal2Struct signal);
}
public class MyWorkflowImpl implements MyWorkflow {
private Signal1Struct signal1;
private Signal2Struct signal2;
@Override
public void main() {
Workflow.await(Duration.ofMinutes(30),
() -> signal1 != null || signal2 != null);
if (signal1 == null && signal2 == null) {
// handle timeout
}
// process signals
}
@Override
public void signal1(Signal1Struct signal) {
signal1 = signal;
}
@Override
public void signal2(Signal2Struct signal) {
signal2 = signal;
}
}
Обратите внимание, что рекомендуется учитывать простои рабочего процесса рабочего процесса. Например, давайте представим, что вышеприведенный рабочий процесс запущен и сигнал получен через 40 минут после запуска, в то время как все рабочие процессы не работают. В этом случае, когда рабочие возвращаются, и timeout
будущее, и signCh
не будут пустыми. Поскольку Selector не гарантирует упорядочение, возможно, что сигнал будет доставлен до таймера, даже если он был получен позже. Таким образом, ваша логика кода должна учитывать это. Например, есть жесткое требование, чтобы сигнал, полученный по прошествии 30 минут с момента запуска рабочего процесса, должен был игнорироваться. Затем приведенный выше образец должен быть изменен на:
Перейти пример:
...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
// handle timeout
} else {
// process signals
}
Пример Java:
public void main() {
long start = Workflow.currentTimeMillis(); // must use workflow clock
Duration timeout = Duration.ofMinutes(30);
Workflow.await(timeout, () -> signal1 != null || signal2 != null);
long duration = Workflow.currentTimeMillis() - start;
if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
// handle timeout
}
// process signals
}
Обновленный код ведет себя правильно, даже если выполнение рабочего процесса было отложено на час.
Перейти пример:
c, err := client.NewClient(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
err := c.SignalWorkflow(context.Background(), <workflowId>, "", "signal1", Signal1Struct{})
Пример Java:
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
GreetingWorkflow myWorkflow =
client.newWorkflowStub(MyWorkflow.class, <workflowId>);
myWorkflow.signal1(new Signal1Struct());
Добавлен пример отправки сигнала. Да, любая структура, которая может быть сериализована через DataConverter, поддерживается в качестве полезной нагрузки сигнала.
Я немного запутался с ОТПРАВКОЙ сигнала от внешней службы. Например. Я знаю, как создать канал для ожидания определенных сигналов, я также в настоящее время знаю, как отправить сигнал через Cadence CLI. Но как мне реализовать код, который будет отправлять сигнал, например, «принять» в рабочий процесс? Во-вторых, любопытно, можем ли мы отправлять структуры как сигналы? или это только струны? Спасибо!