0

I have a class that mainly updates data and analyzes it, continuously every 30 seconds. The point is that since the functions you see in the pseudo code must remain separate, I am forced to download all the data and then analyze it all. My goal would be to rewrite the code so that as soon as the data for a ticker is available, the function immediately parses it. This work flow is explained in the pseudo code 02 but is it possible to do it without merging the functions? Thanks for the suggestions :)

Pseudo Code 01:

  import datetime, multiprocessing
    class myclass:
        def __init__(self, list_of_symbols):
            self.last_update = 0
            self.list_of_symbols = list_of_symbols
            
        def get_data(self, symbol):
            request_data_to_api(symbol)
            
        def analyze_data(self, symbol):
            analyze(symbol)
            
        def run():
            while True:
                if (self.last_update == 0) or ((datetime.datetime.now() - self.last_update).seconds >= 30):
                    self.last_update = datetime.datetime.now()
    
                    # Update data as soon as possible
                    pool = multiprocessing.Pool(20)
                    pool.map(self.get_data, self.list_of_symbols)
                    pool.close()
                    pool.join()
                    
                    # Analyze data as soon as possible
                    pool = multiprocessing.Pool(20)
                    pool.map(self.analyze_data, self.list_of_symbols)
                    pool.close()
                    pool.join()

Pseudo Code 02:

class myclass:
    def __init__(self, list_of_symbols):
        self.last_update = 0
        self.list_of_symbols = list_of_symbols
        
    def get_all_in_one(self, symbol):
        request_data_to_api(symbol)
        analyze(symbol)      
        
    def run():
        while True:
            if (self.last_update == 0) or ((datetime.datetime.now() - self.last_update).seconds >= 30):
                self.last_update = datetime.datetime.now()
                
                pool = multiprocessing.Pool(20)
                pool.map(self.get_all_in_one, self.list_of_symbols)
                pool.close()
                pool.join()
2
  • 1
    Is there any reason why you have to use multiprocessing instead of just multithreading? If not, maybe try changing it to threading first? You don't have to do the entire queue and pipe/shared memory dance that way. Commented May 7, 2021 at 2:00
  • 1
    What op is suggesting is perfectly fine. There's no issues with shared memory or whatever. If you really really want to keep the two functions separate, one way to achieve it might be to use the concurrent.futures package, which is just a slightly different way to do multiptroc. You submit your a data fetch job, get a Future object back, then add a callback function to that future to submit the follow-up analyze job to the Thread or ProcessPool. Commented May 7, 2021 at 3:15

1 Answer 1

1

I am working on the assumption that method get_data is an I/O-bound task for which multithreading is more suitable (and could benefit from a larger thread pool) whereas analyze_data is CPU-intensive, which should use multiprocessing and should be limited to the number of CPU cores that you have. I have also modified your methods in their signatures and in what they return, which I think is necessary.

The idea is to (1) create the pools once outside the loop since pool creation can be expensive and (2) to use imap_unordered (with an appropriate chunksize argument for efficiency) so that the results returned from get_data can be submitted to analyze_data as soon as they become available:

import datetime
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool

class myclass:
    def __init__(self, list_of_symbols):
        self.last_update = 0
        self.list_of_symbols = list_of_symbols
        
    def get_data(self, symbol):
        data = request_data_to_api(symbol)
        return symbol, data
        
    def analyze_data(self, symbol, data):
        return analyze(symbol, data)
        
    def run(self): # added missing self argument

        def compute_chunksize(iterable_size, pool_size):
            chunksize, remainder = divmod(iterable_size, 4 * pool_size)
            if remainder:
                chunksize += 1
            return chunksize

        iterable_size = len(self.list_of_symbols)
        OPTIMAL_THREADPOOL_SIZE = 50 # Your guess is as good as mine
        threadpool_size = min(iterable_size, OPTIMAL_THREADPOOL_SIZE)
        thread_pool = ThreadPool(threadpool_size)
        processpool_size = min(iterable_size, cpu_count())
        process_pool = Pool(processpool_size) # use all the CPU cores that are available
        chunksize = compute_chunksize(iterable_size, processpool_size)

        while True:
            if (self.last_update == 0) or ((datetime.datetime.now() - self.last_update).seconds >= 30):
                self.last_update = datetime.datetime.now()

                # Update data as soon as possible
                results = thread_pool.imap_unordered(self.get_data, self.list_of_symbols, chunksize)
                # as results become available:
                async_results = []
                for symbol, data in results:
                    async_results.append(process_pool.apply_async(self.analyze_data, args=(symbol, data)))
                for async_result in async_results:
                    result = async_result.get() # return value from analyze_data

Note

If get_data is also CPU-Intensive, just create the multiprocessing pool and replace thread_pool.imap_unordered(etc. with process_pool.imap_unordered(etc.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.