3

I have a script that processes files using multiprocessing. Here's a snippet:

from multiprocessing import Pool
import os
cores=multiprocessing.cpu_count()

def f_process_file(file):
    rename file  
    convert file
    add metadata

files=[f for f in os.listdir(source_path) if f.endswith('.tif')]
p =  multiprocessing.Pool(processes = cores)
async_result = p.map_async(f_process_file, files)
p.close()
p.join()

Which runs fine, except that I had to do some other actions before I can call f_process_file, which has other arguments. Here's the snippet:

def f_process_file(file, inventory, variety):
    if variety > 1:
        rename file with follow-up number 
        convert file
        add metadata
    else: 
        rename file without follow-up number 
        convert file
        add metadata

# create list 
files=[f for f in os.listdir(source_path) if f.endswith('.tif')]
# create inventory list
inventories = [fn.split('_')[2].split('-')[0].split('.')[0] for fn in files]
# Check number of files per inventory 
counter=collections.Counter(inventories)

for file in files:
    inventory = file.split('_')[2].split('-')[0].split('.')[0]
    matching = [s for s in sorted(counter.items()) if inventory in s]
    for key,variety in matching:  
        f_process_file(file, inventory, variety)

I can't manage getting this executed using multiprocessing. Do you have any advise?

2
  • Have you tried extracting the contents of your for file in files loop into its own method (let's call it file_processing), and then calling async_result = p.map_async(file_processing, files) Commented Nov 28, 2018 at 15:04
  • That's a good idea. I'll try that and let you know Commented Nov 28, 2018 at 15:10

2 Answers 2

1

I found this question and managed to work my question out with apply_async. Here's the snippet:

cores=multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=procs)
for file in files:
  inventory = file.split('_')[2].split('-')[0].split('.')[0]
  matching = [s for s in sorted(counter.items()) if inventory in s]
  for key,variety in matching: 
    pool.apply_async(f_process_file, (source, file, tmp, target, inventory, variety))
pool.close()
pool.join()
Sign up to request clarification or add additional context in comments.

Comments

0

The problem here is that your workload is not ideally suited to multiprocessing.Pool. You are doing a nested iteration and as a result of that, you could have multiple workloads being accessed incrementally. There are two possible methods to solving your problem. The first is to do your single-threaded computation first and then use Pool. To do this, first construct an object, I'll call it ProcessingArgs:

def class ProcessingArgs:

    def __init__(self, file, inventory, variety):
        self.File = file
        self.Inventory = inventory
        self.Variety = variety

Then you can either modify f_process_file to take in a ProcessArgs or you can add a wrapper method that decomposes the class and then calls f_process_file. Either way, your for-loop now looks like this:

needs_processing = []
for file in files:
    inventory = file.split('_')[2].split('-')[0].split('.')[0]
    matching = [s for s in sorted(counter.items()) if inventory in s]
    needs_processing.extend( [ProcessingArgs(file, inventory, variety) for key, variety in matching] )

p = multiprocessing.Pool(processes = cores)
async_result = p.map_async(f_process_file, needs_processing)
p.close()
p.join()

The other option is to use the asyncio library:

import asyncio

await asyncio.gather(f_process_file(p for p in needs_processing))

In this case you would need to prepend the async modifier to def f_process_file so asyncio knows it's an asynchronous function.

2 Comments

I'm quite fresh at Python, so your answer is a bit complicated. I don't know where to put the def class ProcessingArgs: part.
@Rene You'd put it at the highest scope level, so it would be at the top of the file or in a separate file.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.