3

I have parameter sets in numpy arrays that I feed into a multiprocessing queue, but they are garbled when received in the worker. Here is my code to illustrate my issue and question.

import numpy as np
from multiprocessing import Process, Queue

NUMBER_OF_PROCESSES = 2

def worker(input, output):
    for args in iter(input.get, 'STOP'):
        print('Worker receives: ' + repr(args))
        id, par = args
        # simulate a complex task, and return result
        result = par['A'] * par['B']
        output.put((id, result))

# Define parameters to process
parameters = np.array([
    (1.0, 2.0),
    (3.0, 3.0)], dtype=[('A', 'd'), ('B', 'd')])

# Create queues
task_queue = Queue()
done_queue = Queue()

# Submit tasks
for id, par in enumerate(parameters):
    obj = ('id_' + str(id), par)
    print('Submitting task: ' + repr(obj))
    task_queue.put(obj)

# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
    Process(target=worker, args=(task_queue, done_queue)).start()

# Get unordered results
results = {}
for i in range(len(parameters)):
    id, result = done_queue.get()
    results[id] = result

# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
    task_queue.put('STOP')

print('results: ' + str(results))

With numpy 1.4.1 and Python 2.6.6 on a 64-bit CentOS computer, my output is:

Submitting task: ('id_0', (1.0, 2.0))
Submitting task: ('id_1', (3.0, 3.0))
Worker receives: ('id_0', (2.07827093387802e-316, 6.9204740511333381e-310))
Worker receives: ('id_1', (0.0, 1.8834810076011668e-316))
results: {'id_0': 0.0, 'id_1': 0.0}

As it is shown, the tuple with the numpy record arrays are in good condition when submitting the tasks, but are garbled when the worker receives the arguments, and the results are incorrect. I read in the multiprocessing documentation that the "arguments to the methods of proxies are picklable". From what I can tell, the numpy arrays are perfectly picklable:

>>> import pickle
>>> for par in parameters:
...     print(pickle.loads(pickle.dumps(par)))
...     
(1.0, 2.0)
(3.0, 3.0)

My question is why the parameters are not correctly received in the worker? How can I otherwise pass a row of a numpy record array to a worker?

2 Answers 2

1

numpy arrays should be pickle-able (I think) but here you're actually dealing with numpy.void instances which, I'm not sure why, don't seem to be pickle-able.

If you do:

for par in parameters:
    print(type(par))
    print pickle.loads(pickle.dumps(par))

You get:

<type 'numpy.void'>
(-1.3918046672290164e-41, -1.3918046679677054e-41)
<type 'numpy.void'>
(-1.3918046672290164e-41, -1.3918046679677054e-41)

One way to work around this is to apply parameters = parameters.reshape([-1, 1]) to make your (N,) array into an (N, 1) array. That way when you loop over parameters, you'll get arrays of size 1 which will hopefully pickle just fine. Hope that helps.

Sign up to request clarification or add additional context in comments.

1 Comment

Looks like your issue was a bug, fixed in this commit github.com/numpy/numpy/commit/…
0

I have met the same problem with you, but my condition is a little differnt with you.

Originally, I output a number every loop in my subprocess and combine them into numpy.dnarray. Finially, I pass the array to Queue, but My main process can't start after I run p.join().

Old codes it's like this below

# subprocess
for i in range(n):
    array[i] = data[i]
queue.put(array)
# main process
queue.get()

However, I change another way to handle such problem such like this

# subprocess
for i in range(n):
    queue.put((i, data[i]))
# main process
for i in range(n):
    while queue.empty():
        i, data = queue.get()
        array[i] = data

Briefly, I just split my data into smaller part(data, position) and pass them to the queue, and the main process receive the data sync. I hope it would help

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.