У меня есть список / очередь из 200 команд, которые мне нужно запустить в оболочке на сервере Linux.
Я хочу, чтобы одновременно выполнялось не более 10 процессов (из очереди). Некоторые процессы будут выполняться за несколько секунд, другие - намного дольше.
Когда процесс завершается, я хочу, чтобы следующая команда была «извлечена» из очереди и выполнена.
Есть ли у кого-нибудь код для решения этой проблемы?
Дальнейшая проработка:
В какой-то очереди нужно выполнить 200 работ. Я хочу, чтобы одновременно выполнялось не более 10 работ. Когда поток заканчивает часть работы, он должен запросить очередь для следующей части работы. Если в очереди больше нет работы, поток должен умереть. Когда все потоки умерли, это означает, что вся работа сделана.
Фактическая проблема, которую я пытаюсь решить, заключается в использовании imapsync для синхронизации 200 почтовых ящиков со старого почтового сервера на новый почтовый сервер. У некоторых пользователей большие почтовые ящики, и синхронизация занимает много времени, у других очень маленькие почтовые ящики, и они быстро синхронизируются.






Если вы собираетесь использовать Python, я рекомендую для этого использовать Скрученный.
В частности, Скрученный бегун.
Не могли бы вы пояснить, что вы имеете в виду под в параллели? Похоже, вам нужно реализовать какую-то блокировку в очереди, чтобы ваши записи не выбирались дважды и т.д., а команды выполнялись только один раз.
Большинство систем очередей обманывают - они просто пишут огромный список дел, а затем выбирают, например, десять пунктов, обработайте их и выберите следующие десять пунктов. Распараллеливания нет.
Если вы предоставите более подробную информацию, я уверен, что мы сможем вам помочь.
GNU make (и, возможно, другие реализации) имеет аргумент -j, который определяет, сколько заданий он будет запускать одновременно. Когда задание завершится, make запустит еще одно.
Что ж, если они в значительной степени независимы друг от друга, я бы подумал о следующем:
Initialize an array of jobs pending (queue, ...) - 200 entries
Initialize an array of jobs running - empty
while (jobs still pending and queue of jobs running still has space)
take a job off the pending queue
launch it in background
if (queue of jobs running is full)
wait for a job to finish
remove from jobs running queue
while (queue of jobs is not empty)
wait for job to finish
remove from jobs running queue
Обратите внимание, что хвостовой тест в основном цикле означает, что если «очередь выполнения заданий» имеет место при итерации цикла while, предотвращая преждевременное завершение цикла. Думаю, логика верна.
Я могу довольно легко понять, как это сделать на C - это было бы не так уж сложно и в Perl (и, следовательно, не слишком сложно на других языках сценариев - Python, Ruby, Tcl и т. д.). Я совсем не уверен, что хотел бы сделать это в оболочке - команда wait в оболочке ожидает завершения всех дочерних элементов, а не завершения какого-либо дочернего элемента.
@BrianMinton: вы правы, что вы можете перечислить конкретные PID с помощью wait, но вы все равно получите поведение «все мертвы», а не «первый мертвый», что действительно нужно этому коду.
В оболочке xargs можно использовать для постановки в очередь параллельной обработки команд. Например, если у вас всегда 3 режима сна одновременно, каждый сон по 1 секунде и в сумме выполняется 10 снов, выполните следующие действия:
echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _
И он будет спать в общей сложности 4 секунды. Если у вас есть список имен и вы хотите передать имена выполняемым командам, снова выполняя 3 команды параллельно, выполните
cat names | xargs -n1 -P3 process_name
Выполнял бы команду process_name alice, process_name bob и так далее.
Вау, я постоянно использую xargs и даже не ожидал, что у него будет такая возможность!
Во втором приведенном вами примере как изменить команду, чтобы process_name мог принимать более одного аргумента? Я хочу сделать что-то вроде этого: cat commands.txt | xargs -n1 -P3 eval, где commands.txt содержит несколько команд (по одной в каждой строке, каждая с несколькими аргументами). Проблема в том, что eval не работает, поскольку это встроенная команда оболочки.
@Eddy попробуйте использовать оболочку в качестве программы для запуска; это позволяет вам использовать произвольные команды оболочки в качестве входных данных. Первый ответ выше делает это с sh, но вы также можете сделать это с bash. Например. если в вашем commands.txt есть несколько строк, которые выглядят как echo test1; sleep1, вы можете использовать это через что-то вроде cat commands.txt | xargs -d'\n' -P3 -n1 /bin/bash -c
Я полагаю, вы могли бы сделать это с помощью make и команды make -j xx.
Возможно, такой make-файл
all : usera userb userc....
usera:
imapsync usera
userb:
imapsync userb
....
make -j 10 -f make-файл
Это сработало именно так, как я надеялся. Я написал код для создания файла Makefile. В итоге получилось более 1000 строк. Спасибо!
Я обнаружил, что если какая-либо из команд завершается с кодом ошибки, make завершает работу, предотвращая выполнение будущих заданий. В некоторых ситуациях это решение не идеально. Какие-нибудь рекомендации для этого сценария?
@redmoskito Если вы запустите make с опцией "-k", она продолжит работу, даже если есть ошибки.
Если кто-то начинает думать о make не как о «планировщике задач», а как об инструменте «параллельной компиляции»… Я полагаю, что более широкая картина состоит в том, что make -j учитывает зависимости, что делает это решение потрясающим при универсальном применении.
В python вы можете попробовать:
import Queue, os, threading
# synchronised queue
queue = Queue.Queue(0) # 0 means no maximum size
# do stuff to initialise queue with strings
# representing os commands
queue.put('sleep 10')
queue.put('echo Sleeping..')
# etc
# or use python to generate commands, e.g.
# for username in ['joe', 'bob', 'fred']:
# queue.put('imapsync %s' % username)
def go():
while True:
try:
# False here means no blocking: raise exception if queue empty
command = queue.get(False)
# Run command. python also has subprocess module which is more
# featureful but I am not very familiar with it.
# os.system is easy :-)
os.system(command)
except Queue.Empty:
return
for i in range(10): # change this to run more/fewer threads
threading.Thread(target=go).start()
Непроверенный ...
(конечно, сам python является однопоточным. Тем не менее, вы все равно должны использовать преимущества нескольких потоков с точки зрения ожидания ввода-вывода.)
1) os.system можно заменить новым улучшенным модулем подпроцесса. 2) Не имеет значения, что CPython имеет GIL, потому что вы выполняете внешние команды, а не код (функции) Python.
Если вы замените threading.Thread на multiprocessing.Process и Queue на multiprocessing.Queue, тогда код будет работать с использованием нескольких процессов.
pssh написан на python, я думаю
Python многопроцессорный модуль, казалось бы, хорошо подходит для вашей проблемы. Это пакет высокого уровня, который поддерживает многопоточность по процессам.
Для этого вида работы написано PPSS: Сценарий оболочки параллельной обработки. Погуглите это имя, и вы его найдете, я не буду ссылаться на спам.
Это выглядит идеально. Я проверю это в следующий раз, когда столкнусь с аналогичной проблемой. Спасибо, что обновили эту ветку своей рекомендацией.
классно! можем ли мы настроить его динамическую потоковую передачу, например, 80% CPU / Ram позволяют?
Практически именно то, что я искал. / me снова пытается заставить его работать.
Параллельный сделан именно для этого.
cat userlist | parallel imapsync
Одно из преимуществ Параллельный по сравнению с другими решениями заключается в том, что он обеспечивает отсутствие смешанного вывода. Выполнение traceroute в Параллельный отлично работает, например:
(echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute
Человек, я люблю этот инструмент. Я знал об этом около 3 часов и собираюсь использовать его, пока самые камни земли не потребуют от меня остановиться.
Fedora 16 включила инструмент в репозиторий пакетов
https://savannah.gnu.org/projects/parallel (GNU параллельно) и pssh может помочь.
Простая функция в zsh для распараллеливания заданий не более чем в 4 подоболочках с использованием файлов блокировки в / tmp.
Единственная нетривиальная часть - это глобальные флаги в первом тесте:
#q: включить подстановку имен файлов в тесте[4]: возвращает только 4-й результатN: игнорировать ошибку при пустом результатеЕго должно быть легко преобразовать в posix, хотя он будет немного более подробным.
Не забывайте избегать кавычек в заданиях с \".
#!/bin/zsh
setopt extendedglob
para() {
lock=/tmp/para_$$_$((paracnt++))
# sleep as long as the 4th lock file exists
until [[ -z /tmp/para_$$_*(#q[4]N) ]] { sleep 0.1 }
# Launch the job in a subshell
( touch $lock ; eval $* ; rm $lock ) &
# Wait for subshell start and lock creation
until [[ -f $lock ]] { sleep 0.001 }
}
para "print A0; sleep 1; print Z0"
para "print A1; sleep 2; print Z1"
para "print A2; sleep 3; print Z2"
para "print A3; sleep 4; print Z3"
para "print A4; sleep 3; print Z4"
para "print A5; sleep 2; print Z5"
# wait for all subshells to terminate
wait
вы также можете использовать команду ожидания для определенного дочернего процесса. Ему можно указать любое количество аргументов, каждый из которых может быть идентификатором pid или заданием.