У меня есть следующая программа, которая использует Python C API. Он создает несколько потоков (NUM_THREADS постоянно). В каждом потоке существует бесконечный цикл, который выполняет очень простую операцию: создает словарь Python, ключ которого id установлен на идентификатор потока, затем выгружает этот словарь в строку и распечатывает его (используя функцию dumps в модуле json Python). После этого поток ждет WAIT_TIME секунд и делает то же самое еще раз.
// g++ -g -o multithread multithread.cpp -I/usr/include/python3.11/ -lpython3.11 -lpthread
#include <Python.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
// WAIT_TIME is in seconds
#define NUM_THREADS 20
#define WAIT_TIME 1
// Global semaphore declaration
sem_t semaphore;
// Global JSON module object, to be accesses in every thread
PyObject* jsonModule;
// Function to be executed by each thread
void* thread_function(void* arg) {
long thread_id = (long)arg;
while(true) {
sem_wait(&semaphore); // mark 1
PyObject* myDict = Py_BuildValue("{s:i}", "id", thread_id);
PyObject* result = PyObject_CallMethod(jsonModule, "dumps", "O", myDict);
PyObject* repr = PyObject_Repr(result);
const char* result_str = PyUnicode_AsUTF8(repr);
printf("Thread %ld result: %s\n", thread_id, result_str);
Py_XDECREF(result);
Py_XDECREF(myDict);
Py_XDECREF(repr);
sem_post(&semaphore); // mark 2
sleep(WAIT_TIME);
}
pthread_exit(NULL);
}
int main() {
pthread_t threads[NUM_THREADS];
int i;
// Initialize the Python interpreter
Py_Initialize();
// Import json module
jsonModule = PyImport_ImportModule("json");
// Initialize the semaphore
sem_init(&semaphore, 0, 1);
// Create threads
for (i = 0; i < NUM_THREADS; ++i) {
if (pthread_create(&threads[i], NULL, thread_function, (void*)(long)i) != 0) {
fprintf(stderr, "Error creating thread\n");
return 1;
}
}
// Join threads
for (i = 0; i < NUM_THREADS; ++i) {
if (pthread_join(threads[i], NULL) != 0) {
fprintf(stderr, "Error joining thread\n");
return 1;
}
}
// Free resources (never reach this point, but added for simmetry)
Py_XDECREF(jsonModule);
// Finalize the Python interpreter
Py_Finalize();
// Destroy the semaphore
sem_destroy(&semaphore);
printf("All threads have completed\n");
return 0;
}
Насколько я проверил опытным путем, программа работает до тех пор, пока семафор берется до начала вызова функций Py*. Другими словами, пока используются линии в точках mark 1 и mark 2.
Если я удалю операторы mark 1 и mark 2 (таким образом удалив исключение базы семафоров), то программа в конечном итоге очень скоро выйдет из строя. Глядя на обратную трассировку сгенерированного файла core, кажется, что проблема в вызове функции PyObject_CallMethod().
(gdb) bt
#0 0x00007fb315289c19 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#1 0x00007fb31526aac6 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#2 0x00007fb31517d80b in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#3 0x00007fb31517ddd9 in PyObject_CallMethod () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#4 0x000055e1a763f2ef in thread_function (arg=0x11) at multithread.cpp:24
#5 0x00007fb314ea8134 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#6 0x00007fb314f287dc in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
Это немного удивительно, поскольку все переменные PyObject* являются локальными для функции потока (myDict, result и repr). Единственная переменная PyObject*, не локальная для потока, — это переменная самого модуля (jsonModule). Это тот, кто вызывает проблему?
Означает ли это, что библиотека Python C не является потокобезопасной, поэтому одновременно можно запускать не более одной функции Py*? Есть ли какая-либо альтернатива тому, который я использовал (т.е. семафор, реализованный в моем собственном коде)? Есть ли хороший шаблон реализации для программ такого типа (например, многопоточный с использованием Python C API)?
Заранее спасибо!





Python не является потокобезопасным:
Интерпретатор Python не является полностью потокобезопасным. Для поддержки многопоточных программ Python существует глобальная блокировка, называемая глобальной блокировкой интерпретатора или GIL, которая должна удерживаться текущим потоком, прежде чем он сможет безопасно получить доступ к объектам Python. Без блокировки даже самые простые операции могут вызвать проблемы в многопоточной программе: например, когда два потока одновременно увеличивают счетчик ссылок одного и того же объекта, счетчик ссылок может увеличиться только один раз, а не дважды.
Следовательно, существует правило, согласно которому только поток, получивший GIL , может работать с объектами Python или вызывать функции API Python/C. Чтобы эмулировать параллельное выполнение, интерпретатор регулярно пытается переключить потоки (см. sys.setswitchinterval()). Блокировка также снимается для потенциальной блокировки операций ввода-вывода, таких как чтение или запись файла, чтобы в это время могли выполняться другие потоки Python.
Интерпретатор Python хранит некоторую учетную информацию, специфичную для потока, внутри структуры данных, называемой PyThreadState . Также есть одна глобальная переменная, указывающая на текущий PyThreadState : ее можно получить с помощью PyThreadState_Get().
...
Использование семафора эффективно копирует функциональность «глобальной блокировки интерпретатора» Python.
Структуры данных интерпретатора CPython не являются потокобезопасными. Вот почему существует Глобальная блокировка интерпретатора (GIL) и почему многопоточные программы CPython обречены на медленную работу (поскольку только разблокированные части могут быть на самом деле многопоточными, а большая часть библиотек не может выпустить GIL, потому что это не так. безопасно делать это на объектах, написанных на чистом Python). Процитируем документацию:
В CPython глобальная блокировка интерпретатора, или GIL, представляет собой мьютекс, который защищает доступ к объектам Python, не позволяя нескольким потокам одновременно выполнять байт-коды Python. GIL предотвращает состояния гонки и обеспечивает безопасность потоков. Хорошее объяснение того, как Python GIL помогает в этих областях, можно найти здесь. Короче говоря, этот мьютекс необходим главным образом потому, что управление памятью CPython не является потокобезопасным.
Вы можете освободить GIL, когда не работаете с объектами CPython (например, для чистых числовых вычислений или операций ввода-вывода). Вместо этого часто используется многопроцессорность, чтобы обойти это сильное ограничение. Это означает использование нескольких процессов-интерпретаторов и, как правило, взаимодействие с использованием межпроцессного взаимодействия (IPC), что часто довольно дорого (хотя возможна общая память, но не для объектов интерпретатора Python).
Похоже, это игнорирует Python GIL. Когда поток выполняет код Python, он должен хранить GIL. Эта блокировка обеспечивает потокобезопасность Python: см. stackoverflow.com/questions/1294382/… и docs.python.org/3/c-api/…