NAGASH 0.9.8
Next Generation Analysis System
Loading...
Searching...
No Matches
ThreadPool.h
Go to the documentation of this file.
1//***************************************************************************************
4//***************************************************************************************
5
6#pragma once
7
8#include <vector>
9#include <queue>
10#include <memory>
11#include <thread>
12#include <mutex>
13#include <condition_variable>
14#include <future>
15#include <functional>
16#include <stdexcept>
17
18namespace NAGASH
19{
25 {
26 public:
27 ThreadPool(size_t);
28 template <class F, class... Args>
29 auto enqueue(F &&f, Args &&...args)
30 -> std::future<typename std::invoke_result_t<F, Args...>>;
32
33 private:
34 // need to keep track of threads so we can join them
35 std::vector<std::thread> workers;
36 // the task queue
37 std::queue<std::function<void()>> tasks;
38
39 // synchronization
40 std::mutex queue_mutex;
41 std::condition_variable condition;
42 bool stop;
43 };
44
47 // the constructor just launches some amount of workers
48 inline ThreadPool::ThreadPool(size_t threads)
49 : stop(false)
50 {
51 for (size_t i = 0; i < threads; ++i)
52 workers.emplace_back(
53 [this]
54 {
55 for (;;)
56 {
57 std::function<void()> task;
58
59 {
60 std::unique_lock<std::mutex> lock(this->queue_mutex);
61 this->condition.wait(lock,
62 [this]
63 { return this->stop || !this->tasks.empty(); });
64 if (this->stop && this->tasks.empty())
65 return;
66 task = std::move(this->tasks.front());
67 this->tasks.pop();
68 }
69
70 task();
71 }
72 });
73 }
74
81 template <class F, class... Args>
82 auto ThreadPool::enqueue(F &&f, Args &&...args)
83 -> std::future<typename std::invoke_result_t<F, Args...>>
84 {
85 using return_type = typename std::invoke_result_t<F, Args...>;
86
87 auto task = std::make_shared<std::packaged_task<return_type()>>(
88 std::bind(std::forward<F>(f), std::forward<Args>(args)...));
89
90 std::future<return_type> res = task->get_future();
91 {
92 std::unique_lock<std::mutex> lock(queue_mutex);
93
94 // don't allow enqueueing after stopping the pool
95 if (stop)
96 throw std::runtime_error("enqueue on stopped ThreadPool");
97
98 tasks.emplace([task]()
99 { (*task)(); });
100 }
101 condition.notify_one();
102 return res;
103 }
104
106 inline ThreadPool::~ThreadPool()
107 {
108 {
109 std::unique_lock<std::mutex> lock(queue_mutex);
110 stop = true;
111 }
112 condition.notify_all();
113 for (std::thread &worker : workers)
114 worker.join();
115 }
116}
a lightweight thread pool Modified from https://github.com/progschj/ThreadPool
Definition ThreadPool.h:25
std::mutex queue_mutex
Definition ThreadPool.h:40
std::vector< std::thread > workers
Definition ThreadPool.h:35
std::queue< std::function< void()> > tasks
Definition ThreadPool.h:37
std::condition_variable condition
Definition ThreadPool.h:41
~ThreadPool()
Destruct the ThreadPool object, also joining all threads.
Definition ThreadPool.h:106
ThreadPool(size_t)
Construct the ThreadPool object with the given number of threads.
Definition ThreadPool.h:48
auto enqueue(F &&f, Args &&...args) -> std::future< typename std::invoke_result_t< F, Args... > >
Add a task to the thread pool.
Definition ThreadPool.h:82