Как отменить текущий Spring Flux?

Я использую Spring Flux для отправки параллельных запросов службе, это очень упрощенная версия:

Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  } ...

Мне было интересно, как я могу отменить этот поток, например, каким-то образом получить ссылку на поток и сказать ему, чтобы он отключился.

10
0
10 471
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Как вы, наверное, знаете, с реактивными объектами все операторы ленивы. Это означает, что выполнение конвейера откладывается до момента подписки на реактивный поток.

Итак, в вашем примере отменять пока нечего, потому что в этот момент ничего не происходит.

Но предположим, что ваш пример был расширен на:

Disposable disp = Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  }
  .subscribe();

Затем, как вы можете видеть, ваша подписка возвращает объект Disposable, который вы можете использовать для отмены всего, если хотите, например

disp.dispose()

В документации избавляться говорится:

Cancel or dispose the underlying task or resource.

другой раздел документации говорит следующее:

These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose Disposable interface.

Поэтому отмена выполнения потока сопряжена с осложнениями на стороне реактивного объекта, потому что вы хотите, чтобы мир оставался в согласованном состоянии, если вы отмените поток в середине его обработки. Например, если вы что-то создавали, вы можете отказаться от ресурсов, уничтожить любые результаты частичной агрегации, закрыть файлы, каналы, освободить память или любые другие ресурсы, которые у вас есть, потенциально отменить изменения или компенсировать их.

Вы можете прочитать об этом документацию на уборка, чтобы вы также рассмотрели, что вы можете сделать на стороне реактивного объекта.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });

Ответ от @Edwin точен. Пока вы не вызываете подписку, отменять нечего, потому что никакой код выполняться не будет. Просто хотел добавить пример, чтобы было понятно.

public static void main(String[] args) throws InterruptedException {

   List<String> lists = Lists.newArrayList("abc", "def", "ghi");
   Disposable disposable = Flux.fromIterable(lists)
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

   Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
   disposable.dispose();

   Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}

Но я бы сказал, что более чистый способ (функциональный) - использовать такие функции, как takeUntil или take. Например, я могу остановить поток в приведенном выше примере, как это.

List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

или

List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
                           .delayElements(Duration.ofSeconds(2))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

Другие вопросы по теме