Я хочу создать несколько процессов, которые будут выполнять некоторые вычисления и собирать результаты каждого из них в список. Рассмотрим этот, хотя и некорректный, игрушечный пример:
defmodule Counter do
def loop(items \\ [])
def loop(items) do
receive do
{:append, item} ->
IO.inspect([item | items])
loop([item | items])
:exit ->
items
end
end
def push(from_pid, item) do
send(from_pid, {:append, :math.pow(item, 2)})
end
def run() do
for item <- 1..10 do
spawn(Counter, :push, [self(), item])
end
loop()
end
end
Counter.run()
run/1
порождает 10 процессов с 2 аргументами — идентификатором процесса и номером.loop/1
прослушивает сообщения и собирает результаты в список.Проблема в том, что я не понимаю, как правильно перестать слушать сообщения после завершения всех созданных процессов. Я не могу просто отправить другой тип сообщения (в данном случае :exit), чтобы прекратить рекурсивный вызов loop/1, поскольку некоторые процессы могут быть еще не выполнены. Конечно, я мог бы отслеживать количество полученных сообщений и не звонить loop/1 снова, если достигнуто целевое количество. Однако я сомневаюсь, что это правильный подход.
Как правильно это реализовать?
Task.Supervisor.async_stream_nolink — хороший инструмент для выполнения подобных задач. Хотя это может не касаться особенностей работы низкоуровневых функций send и receive, это хороший рецепт для решения подобных проблем, особенно когда вам нужно контролировать, сколько вещей происходит одновременно.
Рассмотрим следующий блок: это займет ок. 5 секунд на выполнение, потому что каждая задача спит в течение 1 секунды, но мы запускаем 2 из них одновременно (через max_concurrency).
iex> Task.Supervisor.start_link(name: TmpTaskSupervisor)
iex> Task.Supervisor.async_stream_nolink(
TmpTaskSupervisor,
1..10,
fn item ->
IO.puts("processing item #{item}")
Process.sleep(1_000)
end,
timeout: 120_000,
max_concurrency: 2
)
|> Stream.run()
Если вы хотите вернуть значения, преобразуйте полученный поток в список, но имейте в виду, что каждое возвращаемое значение будет заключено в кортеж :ok:
iex> Task.Supervisor.start_link(name: TmpTaskSupervisor)
iex> Task.Supervisor.async_stream_nolink(
TmpTaskSupervisor,
1..10,
fn n ->
n * n
end,
timeout: 120_000,
max_concurrency: 2
)
|> Enum.to_list()
[ok: 1, ok: 4, ok: 9, ok: 16, ok: 25, ok: 36, ok: 49, ok: 64, ok: 81, ok: 100]
Проблема в том, что я не понимаю, как правильно перестать слушать сообщения после завершения всех созданных процессов
Spawn/3 возвращает pid. Сохраните список всех pids, а затем передайте список в качестве аргумента вашей функции loop().
Получение внутри вашей функции loop() будет ждать сообщения, которое начинается с pid. Первый pid в списке будет первым сообщением, которое вы ждете, например. {FirstPid, Result}.
Порожденный процесс должен отправить сообщение в виде {self(), Result}.
Как только вы получаете сообщение от первого pid в списке, вы снова рекурсивно вызываете loop() с хвостом списка.
Как только список станет пустым, вы завершите свой цикл (подумайте о нескольких функциональных предложениях).
Теперь предположим, что первому pid требуется больше всего времени для вычисления результата, поэтому вы сидите и ждете при получении этого результата, после чего все остальные получают без ожидания, выполняя несколько микросекунд, поэтому общее количество времени для получения всех сообщений будет примерно равно времени, которое требуется pid для выполнения самых длинных вычислений.
Далее предположим, что первому pid требуется самое короткое время для вычисления результата, скажем, T1. Второй pid завершит расчет за T2-T1 секунд, потому что, пока вы ждали завершения первого pid, второй pid также вычислял свой результат в течение T1 секунд, поэтому для завершения расчета ему требуется только T2-T1 секунд, и так далее для T3, T4 и т. д. В принципе, все более короткие вычисления завершатся до завершения самого длинного вычисления, и вы получите их сообщения до завершения самого длинного вычисления, поэтому общее время ожидания будет временем самого длинного вычисления.
Другими словами, не имеет значения, в каком порядке находятся pid в списке.
Используйте Поток:
1..10
|> Flow.from_enumerable(max_demand: 1)
|> Flow.map(&(&1 * &1))
|> Enum.to_list()
Результат (несортированный):
[1, 9, 16, 25, 36, 49, 4, 64, 81, 100]