Вызов функции Python из нескольких потоков в C++

Нашел несколько похожих тем, но ничего, что могло бы мне помочь.

По сути, у меня есть приложение C++, которое хочет вызвать функцию из скрипта Python. Все это работает очень хорошо.

Но поскольку мне нужно, чтобы он работал в режиме реального времени, а функция python занимает немного времени, я хотел добавить многопоточность.

В общем, один из двух сценариев:

  1. Пул потоков в C++, где каждый поток будет вызывать функцию Python.

или

  1. Пул потоков в приложении Python и C++ добавляет задачи в очередь Python.

Первый вариант мне больше нравится, так что давайте его. По сути, у меня есть скрипт python с функцией (на самом деле есть предсказание Tensorflow):

import time
def pfoo(msg):
  print("Python >> Function called)
  time.sleep(2)
  print("Python << Function finished)

И пул потоков С++ (в основном взятый из здесь):

ThreadPool pool(4);

// initialize the Python
Py_Initialize();
// initialize thread support
PyEval_InitThreads();

// ...

PyObject* m_PyModule = PyImport_ImportModule( "test" );

PyObject* m_PyDict = PyModule_GetDict( m_PyModule );

PyObject* m_PyFoo = PyDict_GetItemString( m_PyDict, "pfoo" );

for ( int i = 0; i < 10; i++ ) { 

    pool.enqueue( [&] { 

      PyEval_CallObject( m_PyFoo, Py_BuildValue( "(s)", arg ) );

    } );
}

Как вы понимаете, ничего не происходит, потому что вы не можете просто вызвать ту же функцию, пока она еще работает. Пробовал макросы Py_BEGIN_ALLOW_THREADS, пробовал PyGILState_Ensure() и более сложный вариант это. У меня нет идей.

Я попробовал второй сценарий, где у меня есть поток бесконечного цикла в Python, который считывает задачи из queue.Queue() и помещает их в ThreadPoolExecutor, а приложение C++ вызывает функцию для добавления задачи в вышеупомянутую очередь. У меня тоже не работает (работает, если я просто запускаю его на Python, но не работает, если он встроен в C++).

Было бы для вас нормально, если бы каждый поток запускал отдельный интерпретатор Python и мог вызывать одну и ту же функцию, но не мог взаимодействовать друг с другом? Как насчет использования многопроцессорности в Python?

John Zwinck 01.03.2019 12:59

@JohnZwinck Я думал об этом, но, поскольку мне нужно загрузить модель Tensorflow и выделить несколько гигабайт памяти графического процессора для ее использования, это может очень быстро стать проблематичным.

xonxt 01.03.2019 13:34
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
855
1

Ответы 1

Я думаю, что, возможно, нашел сносное решение. Работает с синтетическим примером, но потребует живого теста с предсказанием Tensorflow:

Сценарий python по-прежнему использует пул потоков, но без бесконечного цикла потоков:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

executor = ThreadPoolExecutor(max_workers=4)
lock = threading.Lock()
counter = 0

def worker(msg, n):
  global counter

  with lock:
    counter += 1

  print("Python >> Function called with (%s, %d)" % (msg, n))

  for i in range(n): # do some work...
    print("\tPython :: %s: %d" % (msg, i))
    time.sleep(1)

  print("Python << Finished function for (%s, %d)" % (msg, n))

  with lock:
    counter -= 1
    if counter < 0:
      counter = 0


def add(msg, n):
  global executor, counter
  if counter < 4:
    print("++%d threads are free, adding task: %s" % (4 - counter, msg))
    executor.submit(worker, msg, n)

И приложение C++ просто вызывает функцию «добавить», когда это необходимо:

std::map<std::string, PyObject*> m_PyFunctions;

m_PyFunctions["add"] = PyDict_GetItemString( m_PyDict, "add" );

for ( int i = 0; i < 100; i++ ) 
{
    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );

    PyEval_CallObject( m_PyFunctions["add"], 
                       Py_BuildValue( "(s, l)", "task_#" + std::to_string( i ), std::rand() % 5 ) );
}

РЕДАКТИРОВАТЬ: да, проверил это с моим основным приложением, все работает нормально. Грубое решение, но тем не менее приемлемое.

Где вы получаете Python GIL в своем коде C++? Я думаю, вам нужно, иначе ваш код может дать сбой.

John Zwinck 02.03.2019 00:18

@JohnZwinck Я не ... что странно.

xonxt 02.03.2019 16:35

@JohnZwinck не уверен в использовании ЦП в этом синтетическом примере, но добавление двух простых строк кода для вывода текущего thread_id определенно показывает, что пул потоков работает, и задачи выполняются одновременно в разных потоках. Основной код на самом деле представляет собой задачу машинного обучения (прогнозирования), которая выполняется на графическом процессоре.

xonxt 04.03.2019 09:50

Очень интересный пример! Какой питон вы используете? Я пробовал с Python 2.7, и кажется, что потоки активны только во время вызова add(msg, n). Другими словами, если я добавляю задержку к вышеупомянутой функции, я начинаю видеть "Python :: task_#..." как раз в это время...

Artem Bobritsky 18.05.2021 20:39

@ArtemBobritsky, это был Python 3.6, если мне не изменяет память...

xonxt 19.05.2021 21:20

Другие вопросы по теме