1

I have the following threads structure:

               (1)
            /   |   \
          (2)  (3)  (4)
           |    |    |
          (5)  (6)  (7)
           |    |    |
          (8)  (9)  (10)
           |    |    |
          (11) (12) (13)
            \  /     |
            (14)     |
              \     /
                (15)

As you can see, the first function starts three threads, and then each starts a new one. The 14th node is the join of the 11th and 12th; the 15th is the join of the 13th and 14th. I implemented the first two levels (nodes 1, 2, 3, 4) as follows:

self.first()
    list = ['a','b','c']
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(self.do_this, list)
        for result in results:
            print(result)

But have no idea where to go from here.

2
  • Sounds like you might want to consider a task-based structure instead of thinking in threads, where subsequent tasks wait for their predecessors to complete first. Commented May 10, 2021 at 18:12
  • @AKX if you could please give me more details? :) I do not seem to find online anything helpful about task-based structures Commented May 10, 2021 at 18:16

1 Answer 1

3

Here's an example of a runner for a graph like yours.

The idea is that you define a function that runs each task (do_task here), and build a graph of the (immediate) dependencies each task requires. The example task_deps below mirrors your graph from above.

The run_graph function will then call do_task with each task ID; the function is supposed to do whatever it needs to compute your result (it can read the results of any previous computation if it needs to).

The run_graph function will eventually return a dict of {task_id: result}.

The code below outputs

Scheduling {1}
Scheduling {2, 3, 4}
Scheduling {5, 6, 7}
Scheduling {8, 9, 10}
Scheduling {11, 12, 13}
Scheduling {14}
Scheduling {15}

which, as supposed, corresponds exactly to the structure of your graph from top to bottom, and

{1: 'Task 1 completed with result 42',
 2: 'Task 2 completed with result 84',
 3: 'Task 3 completed with result 126',
 4: 'Task 4 completed with result 168',
 5: 'Task 5 completed with result 210',
 6: 'Task 6 completed with result 252',
 7: 'Task 7 completed with result 294',
 8: 'Task 8 completed with result 336',
 9: 'Task 9 completed with result 378',
 10: 'Task 10 completed with result 420',
 11: 'Task 11 completed with result 462',
 12: 'Task 12 completed with result 504',
 13: 'Task 13 completed with result 546',
 14: 'Task 14 completed with result 588',
 15: 'Task 15 completed with result 630'}

import concurrent.futures


def do_task(task_id, results, dependencies):
    # sanity check - this function could use `dependencies` and `results` too
    assert all(dep in results for dep in dependencies)
    return f"Task {task_id} completed with result {task_id * 42}"


def run_graph(task_dependencies, runner):
    # Dict for results for each task.
    results = {}
    # Set of tasks yet to be completed.
    todo = set(task_dependencies)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # While there are items in the to-do set...
        while todo:
            # ... figure out what we can immediately execute by
            # comparing the dependency set to the result keys we already have
            # (i.e. the complement of the to-do set)
            next_tasks = {
                task_id
                for (task_id, deps) in task_dependencies.items()
                if task_id in todo and set(deps) <= set(results)
            }
            # If there are no next tasks we could schedule, it means the dependency
            # graph is incorrect (or at the very least incompleteable).
            if not next_tasks:
                raise RuntimeError(
                    f"Unable to schedule tasks, bad dependencies? Todo: {todo}"
                )

            print("Scheduling", next_tasks)
            # Submit tasks for execution in parallel. `futures` will be a list of
            # 2-tuples (task_id, future).
            futures = [
                (
                    task_id,
                    executor.submit(
                        runner, task_id, results, task_dependencies[task_id]
                    ),
                )
                for task_id in next_tasks
            ]

            # Loop over the futures, waiting for their results; when a future
            # finishes, save the result value and remove that task from the
            # to-do set.
            for (task_id, future) in futures:
                results[task_id] = future.result()
                todo.remove(task_id)
    # Once the while loop finishes, we have our results.
    return results


if __name__ == "__main__":
    task_deps = {
        1: (),
        2: (1,),
        3: (1,),
        4: (1,),
        5: (2,),
        6: (3,),
        7: (4,),
        8: (5,),
        9: (6,),
        10: (7,),
        11: (8,),
        12: (9,),
        13: (10,),
        14: (11, 12),
        15: (14, 13),
    }

    result = run_graph(task_deps, do_task)
    print(result)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.