Bitcoin Core 28.99.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1// Copyright (c) 2012-2022 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_CHECKQUEUE_H
6#define BITCOIN_CHECKQUEUE_H
7
8#include <logging.h>
9#include <sync.h>
10#include <tinyformat.h>
11#include <util/threadnames.h>
12
13#include <algorithm>
14#include <iterator>
15#include <optional>
16#include <vector>
17
32template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
34{
35private:
38
40 std::condition_variable m_worker_cv;
41
43 std::condition_variable m_master_cv;
44
47 std::vector<T> queue GUARDED_BY(m_mutex);
48
50 int nIdle GUARDED_BY(m_mutex){0};
51
53 int nTotal GUARDED_BY(m_mutex){0};
54
56 std::optional<R> m_result GUARDED_BY(m_mutex);
57
63 unsigned int nTodo GUARDED_BY(m_mutex){0};
64
66 const unsigned int nBatchSize;
67
68 std::vector<std::thread> m_worker_threads;
69 bool m_request_stop GUARDED_BY(m_mutex){false};
70
72 std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
73 {
74 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
75 std::vector<T> vChecks;
76 vChecks.reserve(nBatchSize);
77 unsigned int nNow = 0;
78 std::optional<R> local_result;
79 bool do_work;
80 do {
81 {
82 WAIT_LOCK(m_mutex, lock);
83 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
84 if (nNow) {
85 if (local_result.has_value() && !m_result.has_value()) {
86 std::swap(local_result, m_result);
87 }
88 nTodo -= nNow;
89 if (nTodo == 0 && !fMaster) {
90 // We processed the last element; inform the master it can exit and return the result
91 m_master_cv.notify_one();
92 }
93 } else {
94 // first iteration
95 nTotal++;
96 }
97 // logically, the do loop starts here
98 while (queue.empty() && !m_request_stop) {
99 if (fMaster && nTodo == 0) {
100 nTotal--;
101 std::optional<R> to_return = std::move(m_result);
102 // reset the status for new work later
103 m_result = std::nullopt;
104 // return the current status
105 return to_return;
106 }
107 nIdle++;
108 cond.wait(lock); // wait
109 nIdle--;
110 }
111 if (m_request_stop) {
112 // return value does not matter, because m_request_stop is only set in the destructor.
113 return std::nullopt;
114 }
115
116 // Decide how many work units to process now.
117 // * Do not try to do everything at once, but aim for increasingly smaller batches so
118 // all workers finish approximately simultaneously.
119 // * Try to account for idle jobs which will instantly start helping.
120 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
121 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
122 auto start_it = queue.end() - nNow;
123 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
124 queue.erase(start_it, queue.end());
125 // Check whether we need to do work at all
126 do_work = !m_result.has_value();
127 }
128 // execute work
129 if (do_work) {
130 for (T& check : vChecks) {
131 local_result = check();
132 if (local_result.has_value()) break;
133 }
134 }
135 vChecks.clear();
136 } while (true);
137 }
138
139public:
142
144 explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
145 : nBatchSize(batch_size)
146 {
147 LogInfo("Script verification uses %d additional threads", worker_threads_num);
148 m_worker_threads.reserve(worker_threads_num);
149 for (int n = 0; n < worker_threads_num; ++n) {
150 m_worker_threads.emplace_back([this, n]() {
151 util::ThreadRename(strprintf("scriptch.%i", n));
152 Loop(false /* worker thread */);
153 });
154 }
155 }
156
157 // Since this class manages its own resources, which is a thread
158 // pool `m_worker_threads`, copy and move operations are not appropriate.
159 CCheckQueue(const CCheckQueue&) = delete;
163
166 std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
167 {
168 return Loop(true /* master thread */);
169 }
170
172 void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
173 {
174 if (vChecks.empty()) {
175 return;
176 }
177
178 {
179 LOCK(m_mutex);
180 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
181 nTodo += vChecks.size();
182 }
183
184 if (vChecks.size() == 1) {
185 m_worker_cv.notify_one();
186 } else {
187 m_worker_cv.notify_all();
188 }
189 }
190
192 {
193 WITH_LOCK(m_mutex, m_request_stop = true);
194 m_worker_cv.notify_all();
195 for (std::thread& t : m_worker_threads) {
196 t.join();
197 }
198 }
199
200 bool HasThreads() const { return !m_worker_threads.empty(); }
201};
202
207template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
209{
210private:
212 bool fDone;
213
214public:
218 explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
219 {
220 // passed queue is supposed to be unused, or nullptr
221 if (pqueue != nullptr) {
223 }
224 }
225
226 std::optional<R> Complete()
227 {
228 if (pqueue == nullptr) return std::nullopt;
229 auto ret = pqueue->Complete();
230 fDone = true;
231 return ret;
232 }
233
234 void Add(std::vector<T>&& vChecks)
235 {
236 if (pqueue != nullptr) {
237 pqueue->Add(std::move(vChecks));
238 }
239 }
240
242 {
243 if (!fDone)
244 Complete();
245 if (pqueue != nullptr) {
247 }
248 }
249};
250
251#endif // BITCOIN_CHECKQUEUE_H
int ret
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:209
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
CCheckQueueControl(const CCheckQueueControl &)=delete
CCheckQueueControl()=delete
CCheckQueue< T, R > *const pqueue
Definition: checkqueue.h:211
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:218
std::optional< R > Complete()
Definition: checkqueue.h:226
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:234
Queue for verifications that have to be performed.
Definition: checkqueue.h:34
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:69
std::optional< R > Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Join the execution until completion.
Definition: checkqueue.h:166
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:63
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:141
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
CCheckQueue(const CCheckQueue &)=delete
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:172
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:50
CCheckQueue & operator=(CCheckQueue &&)=delete
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:53
CCheckQueue(unsigned int batch_size, int worker_threads_num)
Create a new check queue.
Definition: checkqueue.h:144
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:37
bool HasThreads() const
Definition: checkqueue.h:200
std::optional< R > Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:72
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:40
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:68
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:66
std::optional< R > m_result GUARDED_BY(m_mutex)
The temporary evaluation result.
CCheckQueue(CCheckQueue &&)=delete
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:43
CCheckQueue & operator=(const CCheckQueue &)=delete
#define T(expected, seed, data)
#define LogInfo(...)
Definition: logging.h:261
void ThreadRename(const std::string &)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:57
#define WAIT_LOCK(cs, name)
Definition: sync.h:262
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:264
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:270
#define LOCK(cs)
Definition: sync.h:257
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:301
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1165