Параллельная обработка из очереди команд в Linux (bash, python, ruby ​​... что угодно)

У меня есть список / очередь из 200 команд, которые мне нужно запустить в оболочке на сервере Linux.

Я хочу, чтобы одновременно выполнялось не более 10 процессов (из очереди). Некоторые процессы будут выполняться за несколько секунд, другие - намного дольше.

Когда процесс завершается, я хочу, чтобы следующая команда была «извлечена» из очереди и выполнена.

Есть ли у кого-нибудь код для решения этой проблемы?

Дальнейшая проработка:

В какой-то очереди нужно выполнить 200 работ. Я хочу, чтобы одновременно выполнялось не более 10 работ. Когда поток заканчивает часть работы, он должен запросить очередь для следующей части работы. Если в очереди больше нет работы, поток должен умереть. Когда все потоки умерли, это означает, что вся работа сделана.

Фактическая проблема, которую я пытаюсь решить, заключается в использовании imapsync для синхронизации 200 почтовых ящиков со старого почтового сервера на новый почтовый сервер. У некоторых пользователей большие почтовые ящики, и синхронизация занимает много времени, у других очень маленькие почтовые ящики, и они быстро синхронизируются.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
45
0
21 014
12
Перейти к ответу Данный вопрос помечен как решенный

Ответы 12

Если вы собираетесь использовать Python, я рекомендую для этого использовать Скрученный.

В частности, Скрученный бегун.

Не могли бы вы пояснить, что вы имеете в виду под в параллели? Похоже, вам нужно реализовать какую-то блокировку в очереди, чтобы ваши записи не выбирались дважды и т.д., а команды выполнялись только один раз.

Большинство систем очередей обманывают - они просто пишут огромный список дел, а затем выбирают, например, десять пунктов, обработайте их и выберите следующие десять пунктов. Распараллеливания нет.

Если вы предоставите более подробную информацию, я уверен, что мы сможем вам помочь.

GNU make (и, возможно, другие реализации) имеет аргумент -j, который определяет, сколько заданий он будет запускать одновременно. Когда задание завершится, make запустит еще одно.

Что ж, если они в значительной степени независимы друг от друга, я бы подумал о следующем:

Initialize an array of jobs pending (queue, ...) - 200 entries
Initialize an array of jobs running - empty

while (jobs still pending and queue of jobs running still has space)
    take a job off the pending queue
    launch it in background
    if (queue of jobs running is full)
        wait for a job to finish
        remove from jobs running queue
while (queue of jobs is not empty)
    wait for job to finish
    remove from jobs running queue

Обратите внимание, что хвостовой тест в основном цикле означает, что если «очередь выполнения заданий» имеет место при итерации цикла while, предотвращая преждевременное завершение цикла. Думаю, логика верна.

Я могу довольно легко понять, как это сделать на C - это было бы не так уж сложно и в Perl (и, следовательно, не слишком сложно на других языках сценариев - Python, Ruby, Tcl и т. д.). Я совсем не уверен, что хотел бы сделать это в оболочке - команда wait в оболочке ожидает завершения всех дочерних элементов, а не завершения какого-либо дочернего элемента.

вы также можете использовать команду ожидания для определенного дочернего процесса. Ему можно указать любое количество аргументов, каждый из которых может быть идентификатором pid или заданием.

Brian Minton 01.10.2013 18:56

@BrianMinton: вы правы, что вы можете перечислить конкретные PID с помощью wait, но вы все равно получите поведение «все мертвы», а не «первый мертвый», что действительно нужно этому коду.

Jonathan Leffler 01.10.2013 19:10

В оболочке xargs можно использовать для постановки в очередь параллельной обработки команд. Например, если у вас всегда 3 режима сна одновременно, каждый сон по 1 секунде и в сумме выполняется 10 снов, выполните следующие действия:

echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _

И он будет спать в общей сложности 4 секунды. Если у вас есть список имен и вы хотите передать имена выполняемым командам, снова выполняя 3 команды параллельно, выполните

cat names | xargs -n1 -P3 process_name

Выполнял бы команду process_name alice, process_name bob и так далее.

Вау, я постоянно использую xargs и даже не ожидал, что у него будет такая возможность!

Warrick 19.07.2013 16:00

Во втором приведенном вами примере как изменить команду, чтобы process_name мог принимать более одного аргумента? Я хочу сделать что-то вроде этого: cat commands.txt | xargs -n1 -P3 eval, где commands.txt содержит несколько команд (по одной в каждой строке, каждая с несколькими аргументами). Проблема в том, что eval не работает, поскольку это встроенная команда оболочки.

Eddy 23.07.2015 13:12

@Eddy попробуйте использовать оболочку в качестве программы для запуска; это позволяет вам использовать произвольные команды оболочки в качестве входных данных. Первый ответ выше делает это с sh, но вы также можете сделать это с bash. Например. если в вашем commands.txt есть несколько строк, которые выглядят как echo test1; sleep1, вы можете использовать это через что-то вроде cat commands.txt | xargs -d'\n' -P3 -n1 /bin/bash -c

staticfloat 03.05.2019 04:03
Ответ принят как подходящий

Я полагаю, вы могли бы сделать это с помощью make и команды make -j xx.

Возможно, такой make-файл

all : usera userb userc....

usera:
       imapsync usera
userb:
       imapsync userb
....

make -j 10 -f make-файл

Это сработало именно так, как я надеялся. Я написал код для создания файла Makefile. В итоге получилось более 1000 строк. Спасибо!

mlambie 21.01.2009 10:01

Я обнаружил, что если какая-либо из команд завершается с кодом ошибки, make завершает работу, предотвращая выполнение будущих заданий. В некоторых ситуациях это решение не идеально. Какие-нибудь рекомендации для этого сценария?

Kyle Simek 26.01.2012 02:08

@redmoskito Если вы запустите make с опцией "-k", она продолжит работу, даже если есть ошибки.

Joseph Lisee 22.06.2012 00:06

Если кто-то начинает думать о make не как о «планировщике задач», а как об инструменте «параллельной компиляции»… Я полагаю, что более широкая картина состоит в том, что make -j учитывает зависимости, что делает это решение потрясающим при универсальном применении.

Yauhen Yakimovich 24.01.2013 17:31

В python вы можете попробовать:

import Queue, os, threading

# synchronised queue
queue = Queue.Queue(0)    # 0 means no maximum size

# do stuff to initialise queue with strings
# representing os commands
queue.put('sleep 10')
queue.put('echo Sleeping..')
# etc
# or use python to generate commands, e.g.
# for username in ['joe', 'bob', 'fred']:
#    queue.put('imapsync %s' % username)

def go():
  while True:
    try:
      # False here means no blocking: raise exception if queue empty
      command = queue.get(False)
      # Run command.  python also has subprocess module which is more
      # featureful but I am not very familiar with it.
      # os.system is easy :-)
      os.system(command)
    except Queue.Empty:
      return

for i in range(10):   # change this to run more/fewer threads
  threading.Thread(target=go).start()

Непроверенный ...

(конечно, сам python является однопоточным. Тем не менее, вы все равно должны использовать преимущества нескольких потоков с точки зрения ожидания ввода-вывода.)

1) os.system можно заменить новым улучшенным модулем подпроцесса. 2) Не имеет значения, что CPython имеет GIL, потому что вы выполняете внешние команды, а не код (функции) Python.

Cristian Ciupitu 10.03.2009 03:38

Если вы замените threading.Thread на multiprocessing.Process и Queue на multiprocessing.Queue, тогда код будет работать с использованием нескольких процессов.

jfs 11.03.2009 18:31

pssh написан на python, я думаю

rogerdpack 30.08.2011 02:29

Python многопроцессорный модуль, казалось бы, хорошо подходит для вашей проблемы. Это пакет высокого уровня, который поддерживает многопоточность по процессам.

Для этого вида работы написано PPSS: Сценарий оболочки параллельной обработки. Погуглите это имя, и вы его найдете, я не буду ссылаться на спам.

Это выглядит идеально. Я проверю это в следующий раз, когда столкнусь с аналогичной проблемой. Спасибо, что обновили эту ветку своей рекомендацией.

mlambie 10.03.2009 22:00

классно! можем ли мы настроить его динамическую потоковую передачу, например, 80% CPU / Ram позволяют?

Devrim 04.01.2010 22:09

Практически именно то, что я искал. / me снова пытается заставить его работать.

Yuvi 05.06.2011 10:20

Параллельный сделан именно для этого.

cat userlist | parallel imapsync

Одно из преимуществ Параллельный по сравнению с другими решениями заключается в том, что он обеспечивает отсутствие смешанного вывода. Выполнение traceroute в Параллельный отлично работает, например:

(echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute

Человек, я люблю этот инструмент. Я знал об этом около 3 часов и собираюсь использовать его, пока самые камни земли не потребуют от меня остановиться.

chiggsy 10.12.2010 10:14

Fedora 16 включила инструмент в репозиторий пакетов

myroslav 08.02.2012 03:28

https://savannah.gnu.org/projects/parallel (GNU параллельно) и pssh может помочь.

Простая функция в zsh для распараллеливания заданий не более чем в 4 подоболочках с использованием файлов блокировки в / tmp.

Единственная нетривиальная часть - это глобальные флаги в первом тесте:

  • #q: включить подстановку имен файлов в тесте
  • [4]: возвращает только 4-й результат
  • N: игнорировать ошибку при пустом результате

Его должно быть легко преобразовать в posix, хотя он будет немного более подробным.

Не забывайте избегать кавычек в заданиях с \".

#!/bin/zsh

setopt extendedglob

para() {
    lock=/tmp/para_$$_$((paracnt++))
    # sleep as long as the 4th lock file exists
    until [[ -z /tmp/para_$$_*(#q[4]N) ]] { sleep 0.1 }
    # Launch the job in a subshell
    ( touch $lock ; eval $* ; rm $lock ) &
    # Wait for subshell start and lock creation
    until [[ -f $lock ]] { sleep 0.001 }
}

para "print A0; sleep 1; print Z0"
para "print A1; sleep 2; print Z1"
para "print A2; sleep 3; print Z2"
para "print A3; sleep 4; print Z3"
para "print A4; sleep 3; print Z4"
para "print A5; sleep 2; print Z5"

# wait for all subshells to terminate
wait

Другие вопросы по теме