Bitcoin Core 31.99.0
P2P Digital Currency
threadpool.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_UTIL_THREADPOOL_H
6#define BITCOIN_UTIL_THREADPOOL_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/check.h>
11#include <util/expected.h>
12#include <util/thread.h>
13
14#include <algorithm>
15#include <condition_variable>
16#include <functional>
17#include <future>
18#include <queue>
19#include <ranges>
20#include <thread>
21#include <type_traits>
22#include <utility>
23#include <vector>
24
48{
49private:
50 std::string m_name;
52 std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
53 std::condition_variable m_cv;
54 // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
55 // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
56 // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
57 bool m_interrupt GUARDED_BY(m_mutex){false};
58 std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
59
61 {
62 WAIT_LOCK(m_mutex, wait_lock);
63 for (;;) {
64 std::packaged_task<void()> task;
65 {
66 // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
67 if (!m_interrupt && m_work_queue.empty()) {
68 // Block until the pool is interrupted or a task is available.
69 m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
70 }
71
72 // If stopped and no work left, exit worker
73 if (m_interrupt && m_work_queue.empty()) {
74 return;
75 }
76
77 task = std::move(m_work_queue.front());
78 m_work_queue.pop();
79 }
80
81 {
82 // Execute the task without the lock
83 REVERSE_LOCK(wait_lock, m_mutex);
84 task();
85 }
86 }
87 }
88
89public:
90 explicit ThreadPool(const std::string& name) : m_name(name) {}
91
93 {
94 Stop(); // In case it hasn't been stopped.
95 }
96
105 void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
106 {
107 assert(num_workers > 0);
108 LOCK(m_mutex);
109 if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
110 if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
111
112 // Create workers
113 m_workers.reserve(num_workers);
114 for (int i = 0; i < num_workers; i++) {
115 m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
116 }
117 }
118
129 {
130 // Notify workers and join them
131 std::vector<std::thread> threads_to_join;
132 {
133 LOCK(m_mutex);
134 // Ensure Stop() is not called from a worker thread while workers are still registered,
135 // otherwise a self-join deadlock would occur.
136 auto id = std::this_thread::get_id();
137 for (const auto& worker : m_workers) assert(worker.get_id() != id);
138 // Early shutdown to return right away on any concurrent Submit() call
139 m_interrupt = true;
140 threads_to_join.swap(m_workers);
141 }
142 m_cv.notify_all();
143 // Help draining queue
144 while (ProcessTask()) {}
145 // Free resources
146 for (auto& worker : threads_to_join) worker.join();
147
148 // Since we currently wait for tasks completion, sanity-check empty queue
149 LOCK(m_mutex);
150 Assume(m_work_queue.empty());
151 // Re-allow Start() now that all workers have exited
152 m_interrupt = false;
153 }
154
155 enum class SubmitError {
156 Inactive,
158 };
159
160 template <class F>
161 using Future = std::future<std::invoke_result_t<F>>;
162
163 template <class R>
165
166 template <class F>
167 using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
168
183 template <class F>
185 {
186 PackagedTask<F> task{std::forward<F>(fn)};
187 auto future{task.get_future()};
188 {
189 LOCK(m_mutex);
190 if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191 if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193 m_work_queue.emplace(std::move(task));
194 }
195 m_cv.notify_one();
196 return {std::move(future)};
197 }
198
218 template <std::ranges::sized_range R>
219 requires(!std::is_lvalue_reference_v<R>)
220 [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
221 {
222 std::vector<RangeFuture<R>> futures;
223 futures.reserve(std::ranges::size(fns));
224
225 {
226 LOCK(m_mutex);
227 if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228 if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229 for (auto&& fn : fns) {
231 futures.emplace_back(task.get_future());
232 m_work_queue.emplace(std::move(task));
233 }
234 }
235 m_cv.notify_all();
236 return {std::move(futures)};
237 }
238
244 {
245 std::packaged_task<void()> task;
246 {
247 LOCK(m_mutex);
248 if (m_work_queue.empty()) return false;
249
250 // Pop the task
251 task = std::move(m_work_queue.front());
252 m_work_queue.pop();
253 }
254 task();
255 return true;
256 }
257
269 {
270 WITH_LOCK(m_mutex, m_interrupt = true);
271 m_cv.notify_all();
272 }
273
275 {
276 return WITH_LOCK(m_mutex, return m_work_queue.size());
277 }
278
280 {
281 return WITH_LOCK(m_mutex, return m_workers.size());
282 }
283};
284
285constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
286 switch (err) {
288 return "No active workers";
290 return "Interrupted";
291 }
292 Assume(false); // Unreachable
293 return "Unknown error";
294}
295
296#endif // BITCOIN_UTIL_THREADPOOL_H
#define Assume(val)
Assume is the identity function.
Definition: check.h:125
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:48
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:105
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:279
std::vector< std::thread > m_workers GUARDED_BY(m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:243
std::condition_variable m_cv
Definition: threadpool.h:53
util::Expected< std::vector< RangeFuture< R > >, SubmitError > Submit(R &&fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a range of tasks for asynchronous execution.
Definition: threadpool.h:220
util::Expected< Future< F >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:184
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:128
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
std::packaged_task< std::invoke_result_t< F >()> PackagedTask
Definition: threadpool.h:167
Mutex m_mutex
Definition: threadpool.h:51
ThreadPool(const std::string &name)
Definition: threadpool.h:90
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:268
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:60
std::future< std::invoke_result_t< F > > Future
Definition: threadpool.h:161
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:274
std::string m_name
Definition: threadpool.h:50
bool m_interrupt GUARDED_BY(m_mutex)
Definition: threadpool.h:57
Future< std::ranges::range_reference_t< R > > RangeFuture
Definition: threadpool.h:164
The util::Expected class provides a standard way for low-level functions to return either error value...
Definition: expected.h:45
The util::Unexpected class represents an unexpected value stored in util::Expected.
Definition: expected.h:22
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
Definition: thread.cpp:16
const char * name
Definition: rest.cpp:49
#define WAIT_LOCK(cs, name)
Definition: sync.h:274
#define REVERSE_LOCK(g, cs)
Definition: sync.h:254
#define LOCK(cs)
Definition: sync.h:268
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:299
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
Definition: threadpool.h:285
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
assert(!tx.IsCoinBase())