5#ifndef BITCOIN_UTIL_THREADPOOL_H
6#define BITCOIN_UTIL_THREADPOOL_H
15#include <condition_variable>
52 std::condition_variable
m_cv;
63 std::packaged_task<void()> task;
66 if (!m_interrupt && m_work_queue.empty()) {
72 if (m_interrupt && m_work_queue.empty()) {
76 task = std::move(m_work_queue.front());
108 if (m_interrupt)
throw std::runtime_error(
"Thread pool has been interrupted or is stopping");
109 if (!m_workers.empty())
throw std::runtime_error(
"Thread pool already started");
112 m_workers.reserve(num_workers);
113 for (
int i = 0; i < num_workers; i++) {
130 std::vector<std::thread> threads_to_join;
135 auto id = std::this_thread::get_id();
136 for (
const auto& worker : m_workers)
assert(worker.get_id() !=
id);
139 threads_to_join.swap(m_workers);
145 for (
auto& worker : threads_to_join) worker.join();
149 Assume(m_work_queue.empty());
176 std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
177 auto future{task.get_future()};
183 m_work_queue.emplace(std::move(task));
186 return {std::move(future)};
195 std::packaged_task<void()> task;
198 if (m_work_queue.empty())
return false;
201 task = std::move(m_work_queue.front());
238 return "No active workers";
240 return "Interrupted";
243 return "Unknown error";
#define Assume(val)
Assume is the identity function.
Fixed-size thread pool for running arbitrary tasks concurrently.
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
std::vector< std::thread > m_workers GUARDED_BY(m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
std::condition_variable m_cv
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
ThreadPool(const std::string &name)
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
bool m_interrupt GUARDED_BY(m_mutex)
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
The util::Expected class provides a standard way for low-level functions to return either error value...
The util::Unexpected class represents an unexpected value stored in util::Expected.
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
#define WAIT_LOCK(cs, name)
#define REVERSE_LOCK(g, cs)
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
#define EXCLUSIVE_LOCKS_REQUIRED(...)