I am tying to write my own threadpool, I am noticing that currently, trying to use this custom made threadpool is slower than doing the exact same work in a single threaded fashion and I don't understand why.
This is the threadpool:
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::thread;
use concurrent_queue::ConcurrentQueue;
pub struct TaskInfo
{
should_stop: bool,
}
pub struct TaskBatchInfo
{
pub task_index: usize,
pub current_task_count: Arc<AtomicUsize>,
}
pub struct Task
{
func: Box<dyn FnMut() -> Option<TaskInfo> + Send + Sync + 'static>,
}
impl Task
{
fn new(func: impl FnMut() -> Option<TaskInfo> + Send + Sync + Clone + 'static)
-> Self
{
Self {
func: Box::new(func),
}
}
fn run(mut self) -> Option<TaskInfo> { (self.func)() }
}
pub struct ThreadPool
{
queue: Arc<ConcurrentQueue<Task>>,
pub should_stop: Arc<AtomicBool>,
concurrency: usize,
}
impl ThreadPool
{
pub fn new(size: usize) -> Self
{
Self {
queue: Arc::new(ConcurrentQueue::unbounded()),
should_stop: Arc::new(AtomicBool::new(false)),
concurrency: size,
}
}
pub fn start(&self)
{
let queue = self.queue.clone();
let concurrency = self.concurrency;
let should_stop = self.should_stop.clone();
// Spawn the entire thread pool loop on a new thread
thread::spawn(move || {
let mut handles = Vec::new();
for _task_id in 0..concurrency
{
let _queue = queue.clone();
let _should_stop = should_stop.clone();
let handle = thread::spawn(move || {
loop
{
if _should_stop.load(Ordering::Relaxed)
{
break;
}
let task = _queue.pop();
if let Ok(task) = task
{
let info = task.run();
if let Some(info) = info
{
if info.should_stop
{
_should_stop.store(true, Ordering::Relaxed);
break;
}
}
} else {
thread::sleep(std::time::Duration::from_micros(500));
}
}
});
handles.push(handle);
}
for handle in handles
{
handle.join().unwrap();
}
});
}
pub fn add_task<F>(&self, task: F)
where
F: FnMut() -> Option<TaskInfo> + Send + Sync + Clone + 'static,
{
let _ = self.queue.push(Task::new(task));
}
pub fn task_batch<F, T>(pool: Arc<Self>, task_count: usize, task: F, termination: T)
where
F: FnMut(TaskBatchInfo) -> Option<TaskInfo> + Send + Sync + Clone + 'static,
T: FnMut() + Send + Sync + Clone + 'static,
{
let task_counter = Arc::new(AtomicUsize::new(task_count));
Self::inner_task_batch(pool, task, task_count, termination, task_counter)
}
pub fn task_batch_with_barrier<F, T, C>(
pool: Arc<Self>,
task_count: usize,
mut task: F,
mut termination: T,
context: C,
) where
F: FnMut(TaskBatchInfo, C) + Send + Sync + Clone + 'static,
T: FnMut() + Send + Sync + Clone + 'static,
C: Clone + Send + Sync + 'static,
{
let wait_flag = Arc::new(AtomicU32::new(0));
let _wait_flag = wait_flag.clone();
let context = Arc::new(context);
ThreadPool::task_batch(
pool.clone(),
task_count,
move |info| {
task(info, context.clone().as_ref().clone());
None
},
move || {
termination();
_wait_flag.store(1, Ordering::Relaxed);
atomic_wait::wake_all(_wait_flag.as_ref());
},
);
atomic_wait::wait(&wait_flag, 0);
}
pub fn task_batch_with_barrier_contextless<F>(
pool: Arc<Self>,
task_count: usize,
mut task: F,
) where
F: FnMut(TaskBatchInfo) + Send + Sync + Clone + 'static,
{
let wait_flag = Arc::new(AtomicU32::new(0));
let _wait_flag = wait_flag.clone();
ThreadPool::task_batch(
pool.clone(),
task_count,
move |info| {
task(info);
None
},
move || {
_wait_flag.store(1, Ordering::Relaxed);
atomic_wait::wake_all(_wait_flag.as_ref());
},
);
atomic_wait::wait(&wait_flag, 0);
}
fn inner_task_batch<F, T>(
pool: Arc<Self>,
task: F,
mut task_count: usize,
termination: T,
current_task_counter: Arc<AtomicUsize>,
) where
F: FnMut(TaskBatchInfo) -> Option<TaskInfo> + Send + Sync + Clone + 'static,
T: FnMut() + Send + Sync + Clone + 'static,
{
let task_chunk_size = (task_count / pool.as_ref().concurrency).max(1);
let quotient = task_count / task_chunk_size;
let remainder = task_count % task_chunk_size;
for i in 0..quotient + (remainder > 0usize) as usize
{
let mut _task = task.clone();
let mut _termination = termination.clone();
let mut _current_task_counter = current_task_counter.clone();
let _pool = pool.clone();
pool.add_task(Box::new(move || {
for j in i * task_chunk_size..((i+1) * task_chunk_size).min(task_count)
{
_task(TaskBatchInfo {
task_index: j,
current_task_count: _current_task_counter.clone(),
});
let val = _current_task_counter.fetch_sub(1, Ordering::Relaxed);
if val == 1
{
_termination()
}
}
None
}));
}
}
}
This is a criterion benchmark file I made:
use std::hint::black_box;
use std::sync::{Arc, atomic::Ordering};
use atomic_float::AtomicF64;
use criterion::{Criterion, criterion_group, criterion_main};
use thread_pool::*;
fn st_addition(input: &mut Vec<f64>)
{
for v in input
{
*v += 1.0;
}
}
fn mt_addition(thread_pool: Arc<ThreadPool>, input: Arc<Vec<AtomicF64>>)
{
ThreadPool::task_batch_with_barrier_contextless(
thread_pool.clone(),
input.len(),
move |info: TaskBatchInfo| {
let input = input.clone();
let i = info.task_index;
input[i].fetch_add(1., Ordering::Relaxed);
},
);
}
fn threadpool_benchmark(c: &mut Criterion)
{
let mut test_bed = vec![0.; 50_000_000];
c.bench_function("single thread", |b| {
b.iter(|| st_addition(black_box(&mut test_bed)))
});
let thread_num: usize = std::thread::available_parallelism().unwrap().into();
let threadpool = Arc::new(ThreadPool::new(thread_num));
threadpool.start();
let mut test_bed = vec![];
for _ in 0..50_000_000
{
test_bed.push(AtomicF64::new(0.0));
}
let test_bed = Arc::new(test_bed);
let pool = threadpool.clone();
let data = test_bed.clone();
c.bench_function("multi thread", |b| {
b.iter({
let value = data.clone();
let pool = pool.clone();
move || mt_addition(pool.clone(), value.clone())
})
});
}
fn custom_criterion() -> Criterion
{
Criterion::default()
.measurement_time(std::time::Duration::from_secs(300))
.warm_up_time(std::time::Duration::from_secs(2))
}
criterion_group! {
name = benches;
config = custom_criterion();
targets = threadpool_benchmark
}
criterion_main!(benches);
Which currently reports:
single thread time: [33.355 ms 33.528 ms 33.728 ms]
change: [+0.6652% +1.7882% +2.8456%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 13 outliers among 100 measurements (13.00%)
6 (6.00%) high mild
7 (7.00%) high severe
multi thread time: [2.7053 s 2.7124 s 2.7198 s]
change: [+9.0915% +10.740% +12.440%] (p = 0.00 < 0.05)
Performance has regressed.
So, somehow, the MT version is slower than the ST version for the same amount of work.