0

I'm trying to parallelize a piece of code given below using the multiprocessing module. Everything I try leads to each child process being run one after the other even though they all have different PIDs. I have tried:

  1. CentOS and MacOS
  2. Context as spawn and as fork
  3. Using Queues and using pools
  4. Using Apply and Using map and their async versions
  5. Adding/removing pool.join() and Process.join()

I can't figure out what I am doing wrong.

fs.py:

import numpy as np
from time import sleep
import os

def f(r):
    res = np.arange(r[0], r[1])
    print(f'I am {os.getpid()}')
    sleep(10)
    print(f'I am {os.getpid()} and I am finished')
    return {'nums': res, 'dubs': res * 2}

playground.py:

import multiprocessing as mp
import numpy as np
from fs import f


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    p = ctx.Pool(4)
    with p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = [p.apply(f, (subset, )) for subset in subsets]
        print(res)

    print('Done!')

Command: python playground.py

Output:

I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]), 
  'dubs': array([ 6,  8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!
5
  • 1
    you need to use apply_async. check out the documentation Commented Oct 18, 2021 at 18:58
  • if you have simple function then process may finish it before other process start. Commented Oct 18, 2021 at 19:34
  • maybe you should use map() instead of apply() - apply() may wait for the end of process. Commented Oct 18, 2021 at 19:36
  • In the documentation (docs.python.org/3/library/…) it says apply blocks until result is ready. So you're spawning a new subprocess, let it print, then close it, then spawn the next one. If you have an iterable and a function, as suggested below 'Pool.map' is the way to go. If you want more control, 'Pool.apply_async' is the way to go. Commented Oct 18, 2021 at 19:48
  • It seems I had implemented map and apply_async wrong. Both seem to work now. Commented Oct 18, 2021 at 22:03

2 Answers 2

1

When I use p.map() like this (on Linux Mint)

res = p.map(f, subsets)

then I get

I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished

Maybe you used map() in wrong way. res = [p.map(f, (subset, )) for subset in subsets]


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    with ctx.Pool(4) as p:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = p.map(f, subsets)
        print(res)
        
    print('Done!')

For apply_async you would need two for-loops

    items = [p.apply_async(f, (subset, )) for subset in subsets]
    res = [x.get() for x in items]
    print(res)

And both have to be inside with p:


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    with ctx.Pool(4) as p:
        subsets = [[0, 3], [3, 6], [6, 7]]

        items = [p.apply_async(f, (subset, )) for subset in subsets]
        print(items)
        
        res = [x.get() for x in items]
        print(res)
        
    print('Done!')
Sign up to request clarification or add additional context in comments.

Comments

1

Each child process is being run one-after-the-other because Pool.apply() blocks until a result is ready — effectively preventing parallel processing from taking place.

Using Pool.map_async() instead would prevent that. Note I also made the delay in the f() function variable to make the processing times vary.

playground.py

import multiprocessing as mp
import numpy as np
from pprint import pprint
from fs import f


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    pool = ctx.Pool(4)
    with pool:
        subsets = [[0, 3], [3, 6], [6, 7]]
        res = pool.map_async(f, subsets).get(timeout=10)
    pprint(res, sort_dicts=False)

    print('Done!')

fs.py

import numpy as np
import os
import random
from time import sleep

def f(r):
    print(f'f({r}) called')
    res = np.arange(r[0], r[1])
    print(f'I am {os.getpid()}')
    sleep(random.uniform(0, 2))  # Random time delay.
    print(f'I am {os.getpid()} and I am finished')
    return {'nums': res, 'dubs': res * 2}

Results:

f([0, 3]) called
I am 2120
f([3, 6]) called
I am 32208
f([6, 7]) called
I am 13884
I am 2120 and I am finished
I am 13884 and I am finished
I am 32208 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])},
 {'nums': array([3, 4, 5]), 'dubs': array([ 6,  8, 10])},
 {'nums': array([6]), 'dubs': array([12])}]

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.