2

If we have 2 asyncio coroutines, is it possible to use Python multiproessing to let each of them run in its own process, and allow the coroutines in both processes to be stopped (by calling their stop method) when the user hits Ctrl+C?

This will be similar to the code below, except that foo.start() and bar.start() coroutines should have their own process.

from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal

class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

if __name__ == '__main__':
    foo = App('foo')
    bar = App('bar')
    
    # Running in a single process works fine
    try:
        asyncio.run(asyncio.wait([foo.start(), bar.start()]))
    except KeyboardInterrupt:
        asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))

Tried using multiprocessing and signals, but I am also not sure how to call foo.stop() and bar.stop() before the 2 processes terminates.

if __name__ == '__main__':
    
    def init_worker():
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        
    def start_foo():
        asyncio.run(foo.start())
        
    def start_bar():
        asyncio.run(bar.start())
        
    foo = App('foo')
    bar = App('bar')    
    pool = multiprocessing.Pool(10, init_worker)
        
    try:
        print('Starting 2 jobs')
        pool.apply_async(start_foo)
        pool.apply_async(start_bar)

        while True:        
            time.sleep(1)  # is sleeping like this a bad thing?
                
    except KeyboardInterrupt:
        print('Caught KeyboardInterrupt, terminating workers')
        pool.terminate()
        pool.join()
    
    print('Shut down complete')

# Based on https://stackoverflow.com/a/11312948/741099

Using Python 3.9.5 on Ubuntu 20.04


Based on @Will Da Silva's solution, I made tiny modifications to check if asyncio.run(app.stop()) gets called on pressing Ctrl+C

class App:
    def __init__(self, text):
        self.text = text
    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        print(f'Stopping {self.text}')
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())
        
if __name__ == '__main__':  
    
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:        
        try:
            print(f'Starting {len(jobs)} jobs')
            pool.map(f, jobs)
                
        except KeyboardInterrupt:
            print('Caught KeyboardInterrupt, terminating workers')
                
    print('Shut down complete')

However, it seems that if I repeat starting and stopping the Python script multiple times, print(f'Stopping {self.text}') inside app.stop() does not print to stdout half the time.

Output:

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete

$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete

$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete

1 Answer 1

3

Here's a way to do it without messing with signals, and without changing the App class:

import asyncio
import multiprocessing
import os


class App:
    def __init__(self, text):
        self.text = text

    async def start(self):
        self.loop_task = asyncio.create_task(self.hello())
        await asyncio.wait([self.loop_task])
        
    async def stop(self):
        self.loop_task.cancel()
        
    async def hello(self):
        while True:
            print(self.text)
            await asyncio.sleep(2)


def f(text):
    app = App(text)
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())


if __name__ == '__main__':
    jobs = ('foo', 'bar')
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
        try:
            pool.map(f, jobs)
        except KeyboardInterrupt:
            pool.close()
            pool.join()

It's important that we limit the number of processes in the pool to min(len(jobs), os.cpu_count()), as any unassigned workers won't be in the try-except block that catches KeyboardInterrupt when you enter ctrl-c, and so they'll raise an exception.

To avoid this issue completely you could provide the pool with an initializer that ignores SIGINT, but that prevents us from catching it with KeyboardInterrupt too. I'm not sure how one would ignore it only in the uninitialized worker processes.

You can also create the App instances in the parent process, so long as it can be pickled to be passed across the process border into the child processes.

def f(app):
    try:
        asyncio.run(app.start())
    except KeyboardInterrupt:
        asyncio.run(app.stop())


if __name__ == '__main__':
    jobs = (App('foo'), App('bar'))
    with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
        try:
            pool.map(f, jobs)
        except KeyboardInterrupt:
            pool.close()
            pool.join()
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks! I gave it a try with a few print statements adde and found that the app.stop() does not always get called. Original question updated with more details.
That's strange @Nyxynyx. I tested it with a print statement in stop, and found that it was being called. I'll do more testing and see what I can find.
@Nyxynyx I guess I just got unlucky when running it during my tests earlier. I was able to reproduce the issue, and have fixed it. The solution is to call close and then join on the pool.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.