5#ifndef BITCOIN_UTIL_THREADPOOL_H
6#define BITCOIN_UTIL_THREADPOOL_H
14#include <condition_variable>
51 std::condition_variable
m_cv;
62 std::packaged_task<void()> task;
65 if (!m_interrupt && m_work_queue.empty()) {
71 if (m_interrupt && m_work_queue.empty()) {
75 task = std::move(m_work_queue.front());
107 if (!m_workers.empty())
throw std::runtime_error(
"Thread pool already started");
111 m_workers.reserve(num_workers);
112 for (
int i = 0; i < num_workers; i++) {
128 std::vector<std::thread> threads_to_join;
133 auto id = std::this_thread::get_id();
134 for (
const auto& worker : m_workers)
assert(worker.get_id() !=
id);
137 threads_to_join.swap(m_workers);
140 for (
auto& worker : threads_to_join) worker.join();
157 std::packaged_task task{std::forward<F>(fn)};
158 auto future{task.get_future()};
161 if (m_interrupt || m_workers.empty()) {
162 throw std::runtime_error(
"No active workers; cannot accept new tasks");
164 m_work_queue.emplace(std::move(task));
176 std::packaged_task<void()> task;
179 if (m_work_queue.empty())
return;
182 task = std::move(m_work_queue.front());
#define Assume(val)
Assume is the identity function.
Fixed-size thread pool for running arbitrary tasks concurrently.
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
std::vector< std::thread > m_workers GUARDED_BY(m_mutex)
std::condition_variable m_cv
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
ThreadPool(const std::string &name)
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Submit(F &&fn)
Enqueues a new task for asynchronous execution.
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
bool m_interrupt GUARDED_BY(m_mutex)
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
#define WAIT_LOCK(cs, name)
#define REVERSE_LOCK(g, cs)
#define WITH_LOCK(cs, code)
Run code while locking a mutex.