Дождитесь завершения всех процессов, созданных с помощью spawn/3, и соберите их результаты в Эликсире.

Я хочу создать несколько процессов, которые будут выполнять некоторые вычисления и собирать результаты каждого из них в список. Рассмотрим этот, хотя и некорректный, игрушечный пример:

defmodule Counter do
  def loop(items \\ [])

  def loop(items) do
    receive do
      {:append, item} ->
        IO.inspect([item | items])
        loop([item | items])
      :exit ->
        items
    end
  end

  def push(from_pid, item) do
    send(from_pid, {:append, :math.pow(item, 2)})
  end

  def run() do
    for item <- 1..10 do
      spawn(Counter, :push, [self(), item])
    end

    loop()
  end
end

Counter.run()
  1. Метод run/1 порождает 10 процессов с 2 аргументами — идентификатором процесса и номером.
  2. Каждый порожденный процесс вычисляет результат (в данном случае возводит заданное число в квадрат) и отправляет результат обратно.
  3. Метод loop/1 прослушивает сообщения и собирает результаты в список.

Проблема в том, что я не понимаю, как правильно перестать слушать сообщения после завершения всех созданных процессов. Я не могу просто отправить другой тип сообщения (в данном случае :exit), чтобы прекратить рекурсивный вызов loop/1, поскольку некоторые процессы могут быть еще не выполнены. Конечно, я мог бы отслеживать количество полученных сообщений и не звонить loop/1 снова, если достигнуто целевое количество. Однако я сомневаюсь, что это правильный подход.

Как правильно это реализовать?

Как настроить Tailwind CSS с React.js и Next.js?
Как настроить Tailwind CSS с React.js и Next.js?
Tailwind CSS - единственный фреймворк, который, как я убедился, масштабируется в больших командах. Он легко настраивается, адаптируется к любому...
LeetCode запись решения 2536. Увеличение подматриц на единицу
LeetCode запись решения 2536. Увеличение подматриц на единицу
Увеличение подматриц на единицу - LeetCode
Переключение светлых/темных тем
Переключение светлых/темных тем
В Microsoft Training - Guided Project - Build a simple website with web pages, CSS files and JavaScript files, мы объясняем, как CSS можно...
Отношения &quot;многие ко многим&quot; в Laravel с методами присоединения и отсоединения
Отношения &quot;многие ко многим&quot; в Laravel с методами присоединения и отсоединения
Отношения "многие ко многим" в Laravel могут быть немного сложными, но с помощью Eloquent ORM и его моделей мы можем сделать это с легкостью. В этой...
В PHP
В PHP
В большой кодовой базе с множеством различных компонентов классы, функции и константы могут иметь одинаковые имена. Это может привести к путанице и...
Карта дорог Беладжар PHP Laravel
Карта дорог Беладжар PHP Laravel
Laravel - это PHP-фреймворк, разработанный для облегчения разработки веб-приложений. Laravel предоставляет различные функции, упрощающие разработку...
1
0
60
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Task.Supervisor.async_stream_nolink — хороший инструмент для выполнения подобных задач. Хотя это может не касаться особенностей работы низкоуровневых функций send и receive, это хороший рецепт для решения подобных проблем, особенно когда вам нужно контролировать, сколько вещей происходит одновременно.

Рассмотрим следующий блок: это займет ок. 5 секунд на выполнение, потому что каждая задача спит в течение 1 секунды, но мы запускаем 2 из них одновременно (через max_concurrency).

iex> Task.Supervisor.start_link(name: TmpTaskSupervisor)
iex> Task.Supervisor.async_stream_nolink(
        TmpTaskSupervisor,
        1..10,
        fn item ->
          IO.puts("processing item #{item}")
            Process.sleep(1_000)
        end,
        timeout: 120_000,
        max_concurrency: 2
)
|> Stream.run()

Если вы хотите вернуть значения, преобразуйте полученный поток в список, но имейте в виду, что каждое возвращаемое значение будет заключено в кортеж :ok:

iex> Task.Supervisor.start_link(name: TmpTaskSupervisor)
iex> Task.Supervisor.async_stream_nolink(
        TmpTaskSupervisor,
        1..10,
        fn n ->
          n * n
        end,
        timeout: 120_000,
        max_concurrency: 2
)
|> Enum.to_list()
[ok: 1, ok: 4, ok: 9, ok: 16, ok: 25, ok: 36, ok: 49, ok: 64, ok: 81, ok: 100]
Ответ принят как подходящий
Проблема в том, что я не понимаю, как правильно перестать слушать сообщения после завершения всех созданных процессов
  1. Spawn/3 возвращает pid. Сохраните список всех pids, а затем передайте список в качестве аргумента вашей функции loop().

  2. Получение внутри вашей функции loop() будет ждать сообщения, которое начинается с pid. Первый pid в списке будет первым сообщением, которое вы ждете, например. {FirstPid, Result}.

  3. Порожденный процесс должен отправить сообщение в виде {self(), Result}.

  4. Как только вы получаете сообщение от первого pid в списке, вы снова рекурсивно вызываете loop() с хвостом списка.

  5. Как только список станет пустым, вы завершите свой цикл (подумайте о нескольких функциональных предложениях).

Теперь предположим, что первому pid требуется больше всего времени для вычисления результата, поэтому вы сидите и ждете при получении этого результата, после чего все остальные получают без ожидания, выполняя несколько микросекунд, поэтому общее количество времени для получения всех сообщений будет примерно равно времени, которое требуется pid для выполнения самых длинных вычислений.

Далее предположим, что первому pid требуется самое короткое время для вычисления результата, скажем, T1. Второй pid завершит расчет за T2-T1 секунд, потому что, пока вы ждали завершения первого pid, второй pid также вычислял свой результат в течение T1 секунд, поэтому для завершения расчета ему требуется только T2-T1 секунд, и так далее для T3, T4 и т. д. В принципе, все более короткие вычисления завершатся до завершения самого длинного вычисления, и вы получите их сообщения до завершения самого длинного вычисления, поэтому общее время ожидания будет временем самого длинного вычисления.

Другими словами, не имеет значения, в каком порядке находятся pid в списке.

Используйте Поток:

1..10
|> Flow.from_enumerable(max_demand: 1)
|> Flow.map(&(&1 * &1))
|> Enum.to_list()

Результат (несортированный):

[1, 9, 16, 25, 36, 49, 4, 64, 81, 100]

Другие вопросы по теме