5#ifndef BITCOIN_UTIL_THREADPOOL_H
6#define BITCOIN_UTIL_THREADPOOL_H
15#include <condition_variable>
53 std::condition_variable
m_cv;
64 std::packaged_task<void()> task;
67 if (!m_interrupt && m_work_queue.empty()) {
73 if (m_interrupt && m_work_queue.empty()) {
77 task = std::move(m_work_queue.front());
109 if (m_interrupt)
throw std::runtime_error(
"Thread pool has been interrupted or is stopping");
110 if (!m_workers.empty())
throw std::runtime_error(
"Thread pool already started");
113 m_workers.reserve(num_workers);
114 for (
int i = 0; i < num_workers; i++) {
131 std::vector<std::thread> threads_to_join;
136 auto id = std::this_thread::get_id();
137 for (
const auto& worker : m_workers)
assert(worker.get_id() !=
id);
140 threads_to_join.swap(m_workers);
146 for (
auto& worker : threads_to_join) worker.join();
150 Assume(m_work_queue.empty());
161 using Future = std::future<std::invoke_result_t<F>>;
187 auto future{task.get_future()};
193 m_work_queue.emplace(std::move(task));
196 return {std::move(future)};
218 template <std::ranges::sized_range R>
219 requires(!std::is_lvalue_reference_v<R>)
222 std::vector<RangeFuture<R>> futures;
223 futures.reserve(std::ranges::size(fns));
229 for (
auto&& fn : fns) {
231 futures.emplace_back(task.get_future());
232 m_work_queue.emplace(std::move(task));
236 return {std::move(futures)};
245 std::packaged_task<void()> task;
248 if (m_work_queue.empty())
return false;
251 task = std::move(m_work_queue.front());
288 return "No active workers";
290 return "Interrupted";
293 return "Unknown error";
#define Assume(val)
Assume is the identity function.
Fixed-size thread pool for running arbitrary tasks concurrently.
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
std::vector< std::thread > m_workers GUARDED_BY(m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
std::condition_variable m_cv
util::Expected< std::vector< RangeFuture< R > >, SubmitError > Submit(R &&fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a range of tasks for asynchronous execution.
util::Expected< Future< F >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
std::packaged_task< std::invoke_result_t< F >()> PackagedTask
ThreadPool(const std::string &name)
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
std::future< std::invoke_result_t< F > > Future
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
bool m_interrupt GUARDED_BY(m_mutex)
Future< std::ranges::range_reference_t< R > > RangeFuture
The util::Expected class provides a standard way for low-level functions to return either error value...
The util::Unexpected class represents an unexpected value stored in util::Expected.
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
#define WAIT_LOCK(cs, name)
#define REVERSE_LOCK(g, cs)
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
#define EXCLUSIVE_LOCKS_REQUIRED(...)