0

I am looking for a way to solve my problem with the multiprocessing module. To give you a better understanding what I do, see my following Explanation.

Explanation:

My input_data is an ndarray with 282240 elements of type uint32 ' In the calculation_function() I use a for loop to calculate from every 12 bit a result and put it into the output_data

Because this is very slow, 'I split my input_data into, e.g., 4 or 8 parts and calculate each part in the calculation_function().

Now I am looking for a way, how to parallize the 4 or 8 function calls

The order of the data is elementary, because the data is in image and each pixel have to be at the correct Position. So function call no. 1 calculates the first and the last function call the last pixel of the image.

The calculations work fine and the image can be completly rebuilt from my algorithm but I need the parallelization to speed up for time critical aspects.

Summary: One input ndarray is devided into 4 or 8 parts. In each part are 70560 or 35280 uint32 values. From each 12 bit I calculate one Pixel with 4 or 8 function calls. Each function returns one ndarray with 188160 or 94080 pixel. All return values will be put together in a row and reshaped into an image.

What allready works: Calculations are allready working and I can reconstruct my image

Problem: Function calls are done seriall and in a row but each image reconstruction is very slow

Main Goal: Speed up the function calls by parallize the function calls.

Code:

def decompress(payload,WIDTH,HEIGHT):
    # INPUTS / OUTPUTS
    n_threads = 4                                                                           
    img_input = np.fromstring(payload, dtype='uint32')                                      
    img_output = np.zeros((WIDTH * HEIGHT), dtype=np.uint32)                            
    n_elements_part = np.int(len(img_input) / n_threads)                                    
    input_part=np.zeros((n_threads,n_elements_part)).astype(np.uint32)                      
    output_part =np.zeros((n_threads,np.int(n_elements_part/3*8))).astype(np.uint32)        

    # DEFINE PARTS (here 4 different ones)
    start = np.zeros(n_threads).astype(np.int)                          
    end = np.zeros(n_threads).astype(np.int)                            
    for i in range(0,n_threads):
        start[i] = i * n_elements_part
        end[i] = (i+1) * n_elements_part -1

    # COPY IMAGE DATA
    for idx in range(0,n_threads):
        input_part [idx,:] = img_input[start[idx]:end[idx]+1]


    for idx in range(0,n_threads):                          # following line is the function_call that should be parallized
        output_part[idx,:] = decompress_part2(input_part[idx],output_part[idx])



    # COPY PARTS INTO THE IMAGE
    img_output[0     : 188160] = output_part[0,:]
    img_output[188160: 376320] = output_part[1,:]
    img_output[376320: 564480] = output_part[2,:]
    img_output[564480: 752640] = output_part[3,:]

    # RESHAPE IMAGE
    img_output = np.reshape(img_output,(HEIGHT, WIDTH))

    return img_output

Please dont take care of my beginner programming style :) Just looking for a solution how to parallize the function calls with the multiprocessing module and get back the return ndarrays.

-----------------------------------------------------------------------------

I transfered my problem into a less complex example: Is it possible to parallize each iteration of the for loop where the function is called?

import numpy as np

def split(data,parts,step, length):
    data_array=np.zeros((parts,step))

    for i in range(parts):  
        data_array[i,:] = data[i*step:(i+1)*step]

    return(data_array)

def mul(arr, scalar):
    result = np.multiply(arr,scalar)
    return(result)

data = np.linspace(1.0, 100.0, num=24).astype(int)
parts = 4
length=len(data)
step = np.int(length/parts)
scalar = 2
data_array = split(data,parts,step,length)                      
res_array = np.zeros((parts,step))
print(data_array)

for idx in range(parts):
    test = data_array[idx,:]
    res_array[idx,:] = mul(test,scalar) # Line to be parallized !

print('\n',res_array)
4
  • 1
    I don't see the mentioned calculation_function() in your code Commented Nov 3, 2017 at 9:39
  • The function itself ist not relevant (it just calculates and returns values). Calculations work allready fine, so I want to keep it simple as possible. Relevant for me is how to parallize the 4 times function call. Commented Nov 3, 2017 at 10:02
  • Your edit should have been your question from the beginning. Commented Nov 3, 2017 at 11:51
  • Yeah thats correct..... Sorry for that. Commented Nov 3, 2017 at 11:56

1 Answer 1

2

Use multiprocessing module:

import multiprocessing

def calculation_function(some_array):
    # some logic
    # return result

chunksize = 4    # points to the number of processes and number of chunks to be processed
with multiprocessing.Pool(chunksize) as p:
    results = (p.map(calculation_function, entire_ndarray, chunksize))

Now, results contains an iterable of processing results.

multiprocessing.Pool.map(func, iterable[, chunksize])
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for your answer. See my new example which I just added to the post. Can you easy add your multiprocessing example to my new example to parallize each iteration?
@Chris72H, as was mentioned in my answer: multiprocessing.Pool.map function can split the whole initial array into needed number of chunks without custom split() functions
So when I undestand u right, than I just have to variate the chunksize to get a different number of parallel processes? I tried that and the bigger the chunksize is the longer takes the algo. I thought that I can say: make 4 parallisations and this takes x seconds. When I take 8 parallizations this would take 0.5 *x seconds..... Hmmmm... I still dont get this
Well... I just have to make the Array big enough (120.000 elements), then it is getting faster with a bigger chunksize. Thank you very much

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.