1

I am tying to write my own threadpool, I am noticing that currently, trying to use this custom made threadpool is slower than doing the exact same work in a single threaded fashion and I don't understand why.

This is the threadpool:

use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::thread;

use concurrent_queue::ConcurrentQueue;

pub struct TaskInfo
{
    should_stop: bool,
}

pub struct TaskBatchInfo
{
    pub task_index: usize,
    pub current_task_count: Arc<AtomicUsize>,
}

pub struct Task
{
    func: Box<dyn FnMut() -> Option<TaskInfo> + Send + Sync + 'static>,
}

impl Task
{
    fn new(func: impl FnMut() -> Option<TaskInfo> + Send + Sync + Clone + 'static)
    -> Self
    {
        Self {
            func: Box::new(func),
        }
    }

    fn run(mut self) -> Option<TaskInfo> { (self.func)() }
}

pub struct ThreadPool
{
    queue: Arc<ConcurrentQueue<Task>>,
    pub should_stop: Arc<AtomicBool>,
    concurrency: usize,
}

impl ThreadPool
{
    pub fn new(size: usize) -> Self
    {
        Self {
            queue: Arc::new(ConcurrentQueue::unbounded()),
            should_stop: Arc::new(AtomicBool::new(false)),
            concurrency: size,
        }
    }

    pub fn start(&self)
    {
        let queue = self.queue.clone();
        let concurrency = self.concurrency;
        let should_stop = self.should_stop.clone();

        // Spawn the entire thread pool loop on a new thread
        thread::spawn(move || {
            let mut handles = Vec::new();

            for _task_id in 0..concurrency
            {
                let _queue = queue.clone();
                let _should_stop = should_stop.clone();
                let handle = thread::spawn(move || {
                    loop
                    {
                        if _should_stop.load(Ordering::Relaxed)
                        {
                            break;
                        }

                        let task = _queue.pop();
                        if let Ok(task) = task
                        {
                            let info = task.run();

                            if let Some(info) = info
                            {
                                if info.should_stop
                                {
                                    _should_stop.store(true, Ordering::Relaxed);
                                    break;
                                }
                            }
                        } else {
                            thread::sleep(std::time::Duration::from_micros(500));
                        }
                    }
                });

                handles.push(handle);
            }

            for handle in handles
            {
                handle.join().unwrap();
            }
        });
    }

    pub fn add_task<F>(&self, task: F)
    where
        F: FnMut() -> Option<TaskInfo> + Send + Sync + Clone + 'static,
    {
        let _ = self.queue.push(Task::new(task));
    }

    pub fn task_batch<F, T>(pool: Arc<Self>, task_count: usize, task: F, termination: T)
    where
        F: FnMut(TaskBatchInfo) -> Option<TaskInfo> + Send + Sync + Clone + 'static,
        T: FnMut() + Send + Sync + Clone + 'static,
    {
        let task_counter = Arc::new(AtomicUsize::new(task_count));
        Self::inner_task_batch(pool, task, task_count, termination, task_counter)
    }

    pub fn task_batch_with_barrier<F, T, C>(
        pool: Arc<Self>,
        task_count: usize,
        mut task: F,
        mut termination: T,
        context: C,
    ) where
        F: FnMut(TaskBatchInfo, C) + Send + Sync + Clone + 'static,
        T: FnMut() + Send + Sync + Clone + 'static,
        C: Clone + Send + Sync + 'static,
    {
        let wait_flag = Arc::new(AtomicU32::new(0));
        let _wait_flag = wait_flag.clone();

        let context = Arc::new(context);
        ThreadPool::task_batch(
            pool.clone(),
            task_count,
            move |info| {
                task(info, context.clone().as_ref().clone());

                None
            },
            move || {
                termination();
                _wait_flag.store(1, Ordering::Relaxed);
                atomic_wait::wake_all(_wait_flag.as_ref());
            },
        );
        atomic_wait::wait(&wait_flag, 0);
    }

    pub fn task_batch_with_barrier_contextless<F>(
        pool: Arc<Self>,
        task_count: usize,
        mut task: F,
    ) where
        F: FnMut(TaskBatchInfo) + Send + Sync + Clone + 'static,
    {
        let wait_flag = Arc::new(AtomicU32::new(0));
        let _wait_flag = wait_flag.clone();

        ThreadPool::task_batch(
            pool.clone(),
            task_count,
            move |info| {
                task(info);

                None
            },
            move || {
                _wait_flag.store(1, Ordering::Relaxed);
                atomic_wait::wake_all(_wait_flag.as_ref());
            },
        );
        atomic_wait::wait(&wait_flag, 0);
    }

    fn inner_task_batch<F, T>(
        pool: Arc<Self>,
        task: F,
        mut task_count: usize,
        termination: T,
        current_task_counter: Arc<AtomicUsize>,
    ) where
        F: FnMut(TaskBatchInfo) -> Option<TaskInfo> + Send + Sync + Clone + 'static,
        T: FnMut() + Send + Sync + Clone + 'static,
    {
        let task_chunk_size = (task_count / pool.as_ref().concurrency).max(1);
        let quotient = task_count / task_chunk_size;
        let remainder = task_count % task_chunk_size;
        for i in 0..quotient + (remainder > 0usize) as usize
        {
            let mut _task = task.clone();
            let mut _termination = termination.clone();
            let mut _current_task_counter = current_task_counter.clone();
            let _pool = pool.clone();

            pool.add_task(Box::new(move || {
                for j in i * task_chunk_size..((i+1) * task_chunk_size).min(task_count)
                {
                    _task(TaskBatchInfo {
                        task_index: j,
                        current_task_count: _current_task_counter.clone(),
                    });
                    let val = _current_task_counter.fetch_sub(1, Ordering::Relaxed);
                    if val == 1
                    {
                        _termination()
                    }
                }

                None
            }));
        }
    }
}

This is a criterion benchmark file I made:

use std::hint::black_box;
use std::sync::{Arc, atomic::Ordering};

use atomic_float::AtomicF64;
use criterion::{Criterion, criterion_group, criterion_main};
use thread_pool::*;


fn st_addition(input: &mut Vec<f64>)
{
    for v in input
    {
        *v += 1.0;
    }
}

fn mt_addition(thread_pool: Arc<ThreadPool>, input: Arc<Vec<AtomicF64>>)
{
    ThreadPool::task_batch_with_barrier_contextless(
        thread_pool.clone(),
        input.len(),
        move |info: TaskBatchInfo| {
            let input = input.clone();
            let i = info.task_index;
            input[i].fetch_add(1., Ordering::Relaxed);
        },
    );
}

fn threadpool_benchmark(c: &mut Criterion)
{
    let mut test_bed = vec![0.; 50_000_000];
    c.bench_function("single thread", |b| {
        b.iter(|| st_addition(black_box(&mut test_bed)))
    });

    let thread_num: usize = std::thread::available_parallelism().unwrap().into();
    let threadpool = Arc::new(ThreadPool::new(thread_num));

    threadpool.start();

    let mut test_bed = vec![];
    for _ in 0..50_000_000
    {
        test_bed.push(AtomicF64::new(0.0));
    }
    let test_bed = Arc::new(test_bed);

    let pool = threadpool.clone();
    let data = test_bed.clone();
    c.bench_function("multi thread", |b| {
        b.iter({
            let value = data.clone();
            let pool = pool.clone();

            move || mt_addition(pool.clone(), value.clone())
        })
    });
}

fn custom_criterion() -> Criterion
{
    Criterion::default()
        .measurement_time(std::time::Duration::from_secs(300))
        .warm_up_time(std::time::Duration::from_secs(2))
}

criterion_group! {
    name = benches;
    config = custom_criterion();
    targets = threadpool_benchmark
}
criterion_main!(benches);

Which currently reports:

single thread           time:   [33.355 ms 33.528 ms 33.728 ms]
                        change: [+0.6652% +1.7882% +2.8456%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 13 outliers among 100 measurements (13.00%)
  6 (6.00%) high mild
  7 (7.00%) high severe

multi thread            time:   [2.7053 s 2.7124 s 2.7198 s]
                        change: [+9.0915% +10.740% +12.440%] (p = 0.00 < 0.05)
                        Performance has regressed.

So, somehow, the MT version is slower than the ST version for the same amount of work.

9
  • FWIW i'd recommend you use an off-the-shelf threadpool like rayon::ThreadPool, instead of inventing your own with a queue. this gets rid of the busy looping. and your benchmarks will show the correct part that's slow. Commented May 6 at 10:11
  • @AhmedAEK I tried it and it actually performed worse than my custom threadpool for this particular case. For now, I just want to focus on understanding this problem space better and think of ideas on how to improve the design of the threadpool. Commented May 6 at 16:14
  • 2
    I would also appreciate a complete example. I'm getting a suspicion that your code may be fine (to some extent), but your workload per-task may be too small and thus inter-thread communication dwarfs it (relatively speaking) due to increased contention. Commented May 6 at 16:42
  • What is the exact CPU reference (e.g. "i5-9600KF") on which you run this code? Commented May 6 at 20:09
  • @kmdreko I edited the question to try to get it as close to an MRE as I could. but of course the toml files are not included for conciseness. Commented May 9 at 1:36

1 Answer 1

4

Your threadpool implementation is fine, your benchmarks and expectations are skewed.

NOTE: This answer is only based on benchmarking, I didn't go through and pick at your code. Consider Code Review for that.


On my system I get these numbers, which mimic what you report (I also threw in a rayon equivalent like mentioned in the comments):

single thread                24.2 ms
makogan's thread pool     2,669.0 ms
rayon's thread pool      35,147.0 ms

This looks horrendous even though your threadpool appears to do much better than rayon's. Why does it look so bad?


First, we should be comparing apples to apples. Your single-threaded benchmark is doing += 1.0 while the multi-threaded version is doing .fetch_add(1.0, Ordering::Relaxed). These are mathematically equivalent, but not the same thing. The atomic version is almost certainly doing more work. If we use the atomic operations in the single-threaded benchmark, I got this difference:

fn st_addition(input: &mut Vec<AtomicF64>) {
    for v in input {
        v.fetch_add(1., Ordering::Relaxed);
    }
}
single thread                24.2 ms
single thread (atomic)       92.6 ms

I do not know whether this difference comes from cache effects of atomic operations, or from using multiple instructions compared to a single CPU fadd instruction, or a combination.

There are certainly ways to avoid requiring atomic operations on disjoint slices, but I'll leave that out of scope for this answer since the benchmark is synthetic anyway.


That difference doesn't seem to scratch the 30x difference that remains. The primary reason for the difference is your pool overhead is massive compared to the task size.

In the single-threaded case, the overhead is negligible - all it has beyond the floating-point addition is just the loop.

In the multi-threaded case, you have an Arc::clone (useless by the way) which at minimum consists of an atomic increment, a index bounds check (that the single-threaded version trivially avoids), and looking in the inner loop of inner_task_batch an additional atomic update, another Arc::clone, another if check, and maybe more in addition to the loop. All that just to do one floating-point addition.

This kind of nano-task doesn't make sense in real-world scenarios. If you actually do more work per-task, then the cost melts away:

fn mt_addition(thread_pool: Arc<ThreadPool>, input: Arc<Vec<AtomicF64>>) {
    const TASK_SIZE: usize = 1_000;
    ThreadPool::task_batch_with_barrier_contextless(
        thread_pool.clone(),
        input.len() / TASK_SIZE,
        move |info: TaskBatchInfo| {
            let input = input.clone(); // still useless
            for j in 0..TASK_SIZE {
                let i = j + TASK_SIZE * info.task_index;
                input[i].fetch_add(1., Ordering::Relaxed);
            }
        },
    );
}
single thread                24.2 ms
single thread (atomic)       92.6 ms
makogan's thread pool     2,669.0 ms
makogan's thread pool (1000) 23.3 ms

And even this I would still say is too small in the real world. 5us per task (1000 fetch_adds based on the single-threaded rate) is still extremely small. However, even increasing to what I would call more reasonable - a million - doesn't really change the numbers (rayon again for good measure):

single thread                   24.2 ms
single thread (atomic)          92.6 ms
makogan's thread pool        2,669.0 ms
rayon's thread pool         35,147.0 ms
makogan's thread pool (1000)    23.3 ms
makogan's thread pool (1000000) 23.5 ms
rayon's thread pool (1000000)   22.9 ms

The reason all seem to converge on 24-ish ms is because...


The benchmark (now unleashed) is memory constrained. The 24-ish ms is simply a reflection on how fast my system can shuffle 50 million f64s around. The performance of the threadpool is not going to make this any lower. For a synthetic benchmark like this, you'd want to do CPU-bound work instead (throw some fibonacci or collatz at it).

As a final note, I believe your nano-task implementation does much better than rayon's because you aren't using a global pool of tasks - task_batch_with_barrier_contextless pre-splits the tasks between the available threads so there's no contention at all. This may or may not be desirable depending on your anticipated workflows and the variance between tasks.

That all being said your goal was to measure the threadpool itself and not the code running on it. So your nano-task benchmark is actually a good way to measure the overhead (eyeballing about 50ns). It was just your comparison to the single-threaded version was way off the mark

Sign up to request clarification or add additional context in comments.

2 Comments

Question. > ask_batch_with_barrier_contextless pre-splits the tasks between the available threads so there's no contention at all The intention behind that was to do what you are doing in your last code snippet. i.e. grouping tasks into larger units of work so that each thread has a meaningful amount of work to do with minimum overhead. What kind of change could I do to no longer need it explicitly?
@Makogan Its all about reducing the cost per iteration. I removed the inner clone of _current_task_counter (since you can pass by reference instead) and got down to 1,518ms, then I removed the input.clone() (since it wasn't doing anything useful) and that brought it down to 689ms. I would then recommend moving the termination check outside the loop.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.