Я пытаюсь распараллелить цикл, который использует бесконечный генератор в качестве входных данных для сбора некоторых данных и останавливается, когда получено определенное количество данных.
Моя реализация примерно такая.
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5
return [2 * x]
else:
return [2 * x, x ** 2]
with ProcessPoolExecutor(4) as pool:
out = []
x = A()
for res in pool.map(procpar, x):
out.extend(res)
if len(out) > 100:
break
Теперь, когда я запускаю его, я получаю этот вывод, после чего он просто зависает, и ничего не происходит.
Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Если посмотреть, что происходит, то метод map
пытается развернуть и сгенерировать данные из итератора x = A()
, поэтому он застрял в бесконечном цикле.
Любые предложения, как избежать застревания в бесконечном цикле. Конечно, я мог бы вызывать итератор x
по частям, прежде чем передавать их в пул процессов, но посмотрю, может ли у кого-то быть лучшее или более простое решение.
Попробуйте вместо этого использовать multiprocessing.pool.imap
:
from multiprocessing import Pool
import random
class A:
def __iter__(self):
i = 0
while True:
yield i
i += 1
def procpar(x):
r = random.random()
print('Computing x =', x)
if r > 0.5:
return [2 * x]
else:
return [2 * x, x ** 2]
# Required for Windows:
if __name__ == '__main__':
with Pool(4) as pool:
out = []
x = A()
for res in pool.imap(procpar, x):
out.extend(res)
if len(out) > 100:
break
print(out)
Отпечатки:
Computing x = 0
Computing x = 1
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5
Computing x = 6
Computing x = 7
Computing x = 8
Computing x = 9
Computing x = 10
Computing x = 11
Computing x = 12
Computing x = 13
Computing x = 14
Computing x = 15
Computing x = 16
Computing x = 17
Computing x = 18
Computing x = 19
Computing x = 20
Computing x = 21
Computing x = 22
Computing x = 23
Computing x = 24
Computing x = 25
Computing x = 26
Computing x = 27
Computing x = 28
Computing x = 29
Computing x = 30
Computing x = 31
Computing x = 32
Computing x = 33
Computing x = 34
Computing x = 35
Computing x = 36
Computing x = 37
Computing x = 38
Computing x = 39
Computing x = 40
Computing x = 41
Computing x = 42
Computing x = 43
Computing x = 44
Computing x = 45
Computing x = 46
Computing x = 47
Computing x = 48
Computing x = 49
Computing x = 50
Computing x = 51
Computing x = 52
Computing x = 53
Computing x = 54
Computing x = 55
Computing x = 56
Computing x = 57
Computing x = 58
Computing x = 59
Computing x = 60
Computing x = 61
Computing x = 62
Computing x = 63
Computing x = 64
[0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]
Вы должны опубликовать минимальный воспроизводимый пример, демонстрирующий, что вы на самом деле пытаетесь сделать.
Спасибо! Я думаю, что это действительно хороший и прямой ответ. К сожалению, у меня это не сработало, потому что мои процессы порождают другие подпроцессы, а
multiprocessing
не позволяет это делать из коробки.