Bitcoin Core 30.99.0
P2P Digital Currency
threadpool.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_UTIL_THREADPOOL_H
6#define BITCOIN_UTIL_THREADPOOL_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/check.h>
11#include <util/thread.h>
12
13#include <algorithm>
14#include <condition_variable>
15#include <functional>
16#include <future>
17#include <queue>
18#include <stdexcept>
19#include <thread>
20#include <utility>
21#include <vector>
22
46{
47private:
48 std::string m_name;
50 std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
51 std::condition_variable m_cv;
52 // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
53 // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
54 // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
55 bool m_interrupt GUARDED_BY(m_mutex){false};
56 std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
57
59 {
60 WAIT_LOCK(m_mutex, wait_lock);
61 for (;;) {
62 std::packaged_task<void()> task;
63 {
64 // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
65 if (!m_interrupt && m_work_queue.empty()) {
66 // Block until the pool is interrupted or a task is available.
67 m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
68 }
69
70 // If stopped and no work left, exit worker
71 if (m_interrupt && m_work_queue.empty()) {
72 return;
73 }
74
75 task = std::move(m_work_queue.front());
76 m_work_queue.pop();
77 }
78
79 {
80 // Execute the task without the lock
81 REVERSE_LOCK(wait_lock, m_mutex);
82 task();
83 }
84 }
85 }
86
87public:
88 explicit ThreadPool(const std::string& name) : m_name(name) {}
89
91 {
92 Stop(); // In case it hasn't been stopped.
93 }
94
103 void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
104 {
105 assert(num_workers > 0);
106 LOCK(m_mutex);
107 if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
108 m_interrupt = false; // Reset
109
110 // Create workers
111 m_workers.reserve(num_workers);
112 for (int i = 0; i < num_workers; i++) {
113 m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
114 }
115 }
116
126 {
127 // Notify workers and join them
128 std::vector<std::thread> threads_to_join;
129 {
130 LOCK(m_mutex);
131 // Ensure Stop() is not called from a worker thread while workers are still registered,
132 // otherwise a self-join deadlock would occur.
133 auto id = std::this_thread::get_id();
134 for (const auto& worker : m_workers) assert(worker.get_id() != id);
135 // Early shutdown to return right away on any concurrent Submit() call
136 m_interrupt = true;
137 threads_to_join.swap(m_workers);
138 }
139 m_cv.notify_all();
140 for (auto& worker : threads_to_join) worker.join();
141 // Since we currently wait for tasks completion, sanity-check empty queue
142 WITH_LOCK(m_mutex, Assume(m_work_queue.empty()));
143 // Note: m_interrupt is left true until next Start()
144 }
145
154 template <class F> [[nodiscard]] EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
155 auto Submit(F&& fn)
156 {
157 std::packaged_task task{std::forward<F>(fn)};
158 auto future{task.get_future()};
159 {
160 LOCK(m_mutex);
161 if (m_interrupt || m_workers.empty()) {
162 throw std::runtime_error("No active workers; cannot accept new tasks");
163 }
164 m_work_queue.emplace(std::move(task));
165 }
166 m_cv.notify_one();
167 return future;
168 }
169
175 {
176 std::packaged_task<void()> task;
177 {
178 LOCK(m_mutex);
179 if (m_work_queue.empty()) return;
180
181 // Pop the task
182 task = std::move(m_work_queue.front());
183 m_work_queue.pop();
184 }
185 task();
186 }
187
195 {
196 WITH_LOCK(m_mutex, m_interrupt = true);
197 m_cv.notify_all();
198 }
199
201 {
202 return WITH_LOCK(m_mutex, return m_work_queue.size());
203 }
204
206 {
207 return WITH_LOCK(m_mutex, return m_workers.size());
208 }
209};
210
211#endif // BITCOIN_UTIL_THREADPOOL_H
#define Assume(val)
Assume is the identity function.
Definition: check.h:125
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:46
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:103
void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:174
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:205
std::vector< std::thread > m_workers GUARDED_BY(m_mutex)
std::condition_variable m_cv
Definition: threadpool.h:51
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:125
std::queue< std::packaged_task< void()> > m_work_queue GUARDED_BY(m_mutex)
Mutex m_mutex
Definition: threadpool.h:49
ThreadPool(const std::string &name)
Definition: threadpool.h:88
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:194
void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:58
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) auto Submit(F &&fn)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:154
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:200
std::string m_name
Definition: threadpool.h:48
bool m_interrupt GUARDED_BY(m_mutex)
Definition: threadpool.h:55
void TraceThread(std::string_view thread_name, std::function< void()> thread_func)
A wrapper for do-something-once thread functions.
Definition: thread.cpp:16
const char * name
Definition: rest.cpp:48
#define WAIT_LOCK(cs, name)
Definition: sync.h:264
#define REVERSE_LOCK(g, cs)
Definition: sync.h:244
#define LOCK(cs)
Definition: sync.h:258
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:289
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
assert(!tx.IsCoinBase())