I have parameter sets in numpy arrays that I feed into a multiprocessing queue, but they are garbled when received in the worker. Here is my code to illustrate my issue and question.
import numpy as np
from multiprocessing import Process, Queue
NUMBER_OF_PROCESSES = 2
def worker(input, output):
for args in iter(input.get, 'STOP'):
print('Worker receives: ' + repr(args))
id, par = args
# simulate a complex task, and return result
result = par['A'] * par['B']
output.put((id, result))
# Define parameters to process
parameters = np.array([
(1.0, 2.0),
(3.0, 3.0)], dtype=[('A', 'd'), ('B', 'd')])
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for id, par in enumerate(parameters):
obj = ('id_' + str(id), par)
print('Submitting task: ' + repr(obj))
task_queue.put(obj)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get unordered results
results = {}
for i in range(len(parameters)):
id, result = done_queue.get()
results[id] = result
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print('results: ' + str(results))
With numpy 1.4.1 and Python 2.6.6 on a 64-bit CentOS computer, my output is:
Submitting task: ('id_0', (1.0, 2.0))
Submitting task: ('id_1', (3.0, 3.0))
Worker receives: ('id_0', (2.07827093387802e-316, 6.9204740511333381e-310))
Worker receives: ('id_1', (0.0, 1.8834810076011668e-316))
results: {'id_0': 0.0, 'id_1': 0.0}
As it is shown, the tuple with the numpy record arrays are in good condition when submitting the tasks, but are garbled when the worker receives the arguments, and the results are incorrect. I read in the multiprocessing documentation that the "arguments to the methods of proxies are picklable". From what I can tell, the numpy arrays are perfectly picklable:
>>> import pickle
>>> for par in parameters:
... print(pickle.loads(pickle.dumps(par)))
...
(1.0, 2.0)
(3.0, 3.0)
My question is why the parameters are not correctly received in the worker? How can I otherwise pass a row of a numpy record array to a worker?