Bitcoin Core 30.99.0
P2P Digital Currency
threadpool.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_UTIL_THREADPOOL_H
6#define BITCOIN_UTIL_THREADPOOL_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/expected.h>
11#include <util/check.h>
12#include <util/thread.h>
13
14#include <algorithm>
15#include <condition_variable>
16#include <functional>
17#include <future>
18#include <queue>
19#include <thread>
20#include <type_traits>
21#include <utility>
22#include <vector>
23
47{
48private:
49 std::string m_name;
51 std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
52 std::condition_variable m_cv;
53 // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
54 // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
55 // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
56 bool m_interrupt GUARDED_BY(m_mutex){false};
57 std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
58
60 {
61 WAIT_LOCK(m_mutex, wait_lock);
62 for (;;) {
63 std::packaged_task<void()> task;
64 {
65 // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
66 if (!m_interrupt && m_work_queue.empty()) {
67 // Block until the pool is interrupted or a task is available.
68 m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
69 }
70
71 // If stopped and no work left, exit worker
72 if (m_interrupt && m_work_queue.empty()) {
73 return;
74 }
75
76 task = std::move(m_work_queue.front());
77 m_work_queue.pop();
78 }
79
80 {
81 // Execute the task without the lock
82 REVERSE_LOCK(wait_lock, m_mutex);
83 task();
84 }
85 }
86 }
87
88public:
89 explicit ThreadPool(const std::string& name) : m_name(name) {}
90
92 {
93 Stop(); // In case it hasn't been stopped.
94 }
95
104 void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
105 {
106 assert(num_workers > 0);
107 LOCK(m_mutex);
108 if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
109 if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
110
111 // Create workers
112 m_workers.reserve(num_workers);
113 for (int i = 0; i < num_workers; i++) {
114 m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
115 }
116 }
117
128 {
129 // Notify workers and join them
130 std::vector<std::thread> threads_to_join;
131 {
132 LOCK(m_mutex);
133 // Ensure Stop() is not called from a worker thread while workers are still registered,
134 // otherwise a self-join deadlock would occur.
135 auto id = std::this_thread::get_id();
136 for (const auto& worker : m_workers) assert(worker.get_id() != id);
137 // Early shutdown to return right away on any concurrent Submit() call
138 m_interrupt = true;
139 threads_to_join.swap(m_workers);
140 }
141 m_cv.notify_all();
142 // Help draining queue
143 while (ProcessTask()) {}
144 // Free resources
145 for (auto& worker : threads_to_join) worker.join();
146
147 // Since we currently wait for tasks completion, sanity-check empty queue
148 LOCK(m_mutex);
149 Assume(m_work_queue.empty());
150 // Re-allow Start() now that all workers have exited
151 m_interrupt = false;
152 }
153
154 enum class SubmitError {
155 Inactive,
157 };
158
173 template <class F>
175 {
176 std::packaged_task<std::invoke_result_t<F>()> task{std::forward<F>(fn)};
177 auto future{task.get_future()};
178 {
179 LOCK(m_mutex);
180 if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
181 if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
182
183 m_work_queue.emplace(std::move(task));
184 }
185 m_cv.notify_one();
186 return {std::move(future)};
187 }
188
194 {
195 std::packaged_task<void()> task;
196 {
197 LOCK(m_mutex);
198 if (m_work_queue.empty()) return false;
199
200 // Pop the task
201 task = std::move(m_work_queue.front());
202 m_work_queue.pop();
203 }
204 task();
205 return true;
206 }
207
219 {
220 WITH_LOCK(m_mutex, m_interrupt = true);
221 m_cv.notify_all();
222 }
223
225 {
226 return WITH_LOCK(m_mutex, return m_work_queue.size());
227 }
228
230 {
231 return WITH_LOCK(m_mutex, return m_workers.size());
232 }
233};
234
235constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
236 switch (err) {
238 return "No active workers";
240 return "Interrupted";
241 }
242 Assume(false); // Unreachable
243 return "Unknown error";
244}
245
246#endif // BITCOIN_UTIL_THREADPOOL_H
#define Assume(val)
Assume is the identity function.
Definition: check.h:125
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:47
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:104
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:229
std::vector< std::thread > m_workers GUARDED_BY(m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:193
std::condition_variable m_cv
Definition: threadpool.h:52
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:127
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
Mutex m_mutex
Definition: threadpool.h:50
ThreadPool(const std::string &name)
Definition: threadpool.h:89
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:218
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:59
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:224
std::string m_name
Definition: threadpool.h:49
bool m_interrupt GUARDED_BY(m_mutex)
Definition: threadpool.h:56
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:174
The util::Expected class provides a standard way for low-level functions to return either error value...
Definition: expected.h:45
The util::Unexpected class represents an unexpected value stored in util::Expected.
Definition: expected.h:22
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
Definition: thread.cpp:16
const char * name
Definition: rest.cpp:48
#define WAIT_LOCK(cs, name)
Definition: sync.h:264
#define REVERSE_LOCK(g, cs)
Definition: sync.h:244
#define LOCK(cs)
Definition: sync.h:258
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:289
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
Definition: threadpool.h:235
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:51
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
assert(!tx.IsCoinBase())