Нашел несколько похожих тем, но ничего, что могло бы мне помочь.
По сути, у меня есть приложение C++, которое хочет вызвать функцию из скрипта Python. Все это работает очень хорошо.
Но поскольку мне нужно, чтобы он работал в режиме реального времени, а функция python занимает немного времени, я хотел добавить многопоточность.
В общем, один из двух сценариев:
или
Первый вариант мне больше нравится, так что давайте его. По сути, у меня есть скрипт python с функцией (на самом деле есть предсказание Tensorflow):
import time
def pfoo(msg):
print("Python >> Function called)
time.sleep(2)
print("Python << Function finished)
И пул потоков С++ (в основном взятый из здесь):
ThreadPool pool(4);
// initialize the Python
Py_Initialize();
// initialize thread support
PyEval_InitThreads();
// ...
PyObject* m_PyModule = PyImport_ImportModule( "test" );
PyObject* m_PyDict = PyModule_GetDict( m_PyModule );
PyObject* m_PyFoo = PyDict_GetItemString( m_PyDict, "pfoo" );
for ( int i = 0; i < 10; i++ ) {
pool.enqueue( [&] {
PyEval_CallObject( m_PyFoo, Py_BuildValue( "(s)", arg ) );
} );
}
Как вы понимаете, ничего не происходит, потому что вы не можете просто вызвать ту же функцию, пока она еще работает.
Пробовал макросы Py_BEGIN_ALLOW_THREADS, пробовал PyGILState_Ensure() и более сложный вариант это. У меня нет идей.
Я попробовал второй сценарий, где у меня есть поток бесконечного цикла в Python, который считывает задачи из queue.Queue() и помещает их в ThreadPoolExecutor, а приложение C++ вызывает функцию для добавления задачи в вышеупомянутую очередь. У меня тоже не работает (работает, если я просто запускаю его на Python, но не работает, если он встроен в C++).
@JohnZwinck Я думал об этом, но, поскольку мне нужно загрузить модель Tensorflow и выделить несколько гигабайт памяти графического процессора для ее использования, это может очень быстро стать проблематичным.






Я думаю, что, возможно, нашел сносное решение. Работает с синтетическим примером, но потребует живого теста с предсказанием Tensorflow:
Сценарий python по-прежнему использует пул потоков, но без бесконечного цикла потоков:
from concurrent.futures import ThreadPoolExecutor
import threading
import time
executor = ThreadPoolExecutor(max_workers=4)
lock = threading.Lock()
counter = 0
def worker(msg, n):
global counter
with lock:
counter += 1
print("Python >> Function called with (%s, %d)" % (msg, n))
for i in range(n): # do some work...
print("\tPython :: %s: %d" % (msg, i))
time.sleep(1)
print("Python << Finished function for (%s, %d)" % (msg, n))
with lock:
counter -= 1
if counter < 0:
counter = 0
def add(msg, n):
global executor, counter
if counter < 4:
print("++%d threads are free, adding task: %s" % (4 - counter, msg))
executor.submit(worker, msg, n)
И приложение C++ просто вызывает функцию «добавить», когда это необходимо:
std::map<std::string, PyObject*> m_PyFunctions;
m_PyFunctions["add"] = PyDict_GetItemString( m_PyDict, "add" );
for ( int i = 0; i < 100; i++ )
{
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
PyEval_CallObject( m_PyFunctions["add"],
Py_BuildValue( "(s, l)", "task_#" + std::to_string( i ), std::rand() % 5 ) );
}
РЕДАКТИРОВАТЬ: да, проверил это с моим основным приложением, все работает нормально. Грубое решение, но тем не менее приемлемое.
Где вы получаете Python GIL в своем коде C++? Я думаю, вам нужно, иначе ваш код может дать сбой.
@JohnZwinck Я не ... что странно.
@JohnZwinck не уверен в использовании ЦП в этом синтетическом примере, но добавление двух простых строк кода для вывода текущего thread_id определенно показывает, что пул потоков работает, и задачи выполняются одновременно в разных потоках. Основной код на самом деле представляет собой задачу машинного обучения (прогнозирования), которая выполняется на графическом процессоре.
Очень интересный пример! Какой питон вы используете? Я пробовал с Python 2.7, и кажется, что потоки активны только во время вызова add(msg, n). Другими словами, если я добавляю задержку к вышеупомянутой функции, я начинаю видеть "Python :: task_#..." как раз в это время...
@ArtemBobritsky, это был Python 3.6, если мне не изменяет память...
Было бы для вас нормально, если бы каждый поток запускал отдельный интерпретатор Python и мог вызывать одну и ту же функцию, но не мог взаимодействовать друг с другом? Как насчет использования многопроцессорности в Python?