I have the following program which uses Python C API. It creates a number of threads (NUM_THREADS constant). In each thread there is an infinite loop that does a very simple operation: creates a Python dictionary which key id set to the thread id, then dumps that dictionary into a string a print it (using dumps function in the json Python module). After that the thread waits WAIT_TIME seconds and do the same again.
// g++ -g -o multithread multithread.cpp -I/usr/include/python3.11/ -lpython3.11 -lpthread
#include <Python.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
// WAIT_TIME is in seconds
#define NUM_THREADS 20
#define WAIT_TIME 1
// Global semaphore declaration
sem_t semaphore;
// Global JSON module object, to be accesses in every thread
PyObject* jsonModule;
// Function to be executed by each thread
void* thread_function(void* arg) {
long thread_id = (long)arg;
while(true) {
sem_wait(&semaphore); // mark 1
PyObject* myDict = Py_BuildValue("{s:i}", "id", thread_id);
PyObject* result = PyObject_CallMethod(jsonModule, "dumps", "O", myDict);
PyObject* repr = PyObject_Repr(result);
const char* result_str = PyUnicode_AsUTF8(repr);
printf("Thread %ld result: %s\n", thread_id, result_str);
Py_XDECREF(result);
Py_XDECREF(myDict);
Py_XDECREF(repr);
sem_post(&semaphore); // mark 2
sleep(WAIT_TIME);
}
pthread_exit(NULL);
}
int main() {
pthread_t threads[NUM_THREADS];
int i;
// Initialize the Python interpreter
Py_Initialize();
// Import json module
jsonModule = PyImport_ImportModule("json");
// Initialize the semaphore
sem_init(&semaphore, 0, 1);
// Create threads
for (i = 0; i < NUM_THREADS; ++i) {
if (pthread_create(&threads[i], NULL, thread_function, (void*)(long)i) != 0) {
fprintf(stderr, "Error creating thread\n");
return 1;
}
}
// Join threads
for (i = 0; i < NUM_THREADS; ++i) {
if (pthread_join(threads[i], NULL) != 0) {
fprintf(stderr, "Error joining thread\n");
return 1;
}
}
// Free resources (never reach this point, but added for simmetry)
Py_XDECREF(jsonModule);
// Finalize the Python interpreter
Py_Finalize();
// Destroy the semaphore
sem_destroy(&semaphore);
printf("All threads have completed\n");
return 0;
}
As far as I have empirically checked, the program works as long as the semaphore is taken before starting invoking Py* functions. In other words, as long as the lines in mark 1 and mark 2 points are used.
If I remove mark 1 and mark 2 statements (so removing semaphore base exclusion) then the program eventually crashed very soon. Looking into the backtrace of the generated core file, it seems the problem is in the call of the PyObject_CallMethod() function.
(gdb) bt
#0 0x00007fb315289c19 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#1 0x00007fb31526aac6 in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#2 0x00007fb31517d80b in ?? () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#3 0x00007fb31517ddd9 in PyObject_CallMethod () from /lib/x86_64-linux-gnu/libpython3.11.so.1.0
#4 0x000055e1a763f2ef in thread_function (arg=0x11) at multithread.cpp:24
#5 0x00007fb314ea8134 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#6 0x00007fb314f287dc in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
This is a bit surprising as all the PyObject* variables are local to the thread function (myDict, result and repr). The only PyObject* variable non local to the thread is the one for the module itself (jsonModule). Is that the one causing the problem?
Does this means that Python C library is not thread-safe so no more than one Py* function can be run at the same time? Is there any alternative to the one I have used (i.e. semaphore implemented in my own code)? Any good implementation pattern for this kind of programs (i.e. multi-threaded using Python C API)?
Thanks in advance!