Bitcoin Core 31.99.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1// Copyright (c) 2012-present The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_CHECKQUEUE_H
6#define BITCOIN_CHECKQUEUE_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/log.h>
11#include <util/threadnames.h>
12
13#include <algorithm>
14#include <iterator>
15#include <optional>
16#include <vector>
17
32template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
34{
35private:
38
40 std::condition_variable m_worker_cv;
41
43 std::condition_variable m_master_cv;
44
47 std::vector<T> queue GUARDED_BY(m_mutex);
48
50 int nIdle GUARDED_BY(m_mutex){0};
51
53 int nTotal GUARDED_BY(m_mutex){0};
54
56 std::optional<R> m_result GUARDED_BY(m_mutex);
57
63 unsigned int nTodo GUARDED_BY(m_mutex){0};
64
66 const unsigned int nBatchSize;
67
68 std::vector<std::thread> m_worker_threads;
69 bool m_request_stop GUARDED_BY(m_mutex){false};
70
72
73 std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
74 {
75 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
76 std::vector<T> vChecks;
77 vChecks.reserve(nBatchSize);
78 unsigned int nNow = 0;
79 std::optional<R> local_result;
80 bool do_work;
81 do {
82 {
83 WAIT_LOCK(m_mutex, lock);
84 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
85 if (nNow) {
86 if (local_result.has_value() && !m_result.has_value()) {
87 std::swap(local_result, m_result);
88 }
89 nTodo -= nNow;
90 if (nTodo == 0 && !fMaster) {
91 // We processed the last element; inform the master it can exit and return the result
92 m_master_cv.notify_one();
93 }
94 } else {
95 // first iteration
96 nTotal++;
97 }
98 // logically, the do loop starts here
99 while (queue.empty() && !m_request_stop) {
100 if (fMaster && nTodo == 0) {
101 nTotal--;
102 std::optional<R> to_return = std::move(m_result);
103 // reset the status for new work later
104 m_result = std::nullopt;
105 // return the current status
106 return to_return;
107 }
108 nIdle++;
109 cond.wait(lock); // wait
110 nIdle--;
111 }
112 if (m_request_stop) {
113 // return value does not matter, because m_request_stop is only set in the destructor.
114 return std::nullopt;
115 }
116
117 // Decide how many work units to process now.
118 // * Do not try to do everything at once, but aim for increasingly smaller batches so
119 // all workers finish approximately simultaneously.
120 // * Try to account for idle jobs which will instantly start helping.
121 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
122 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
123 auto start_it = queue.end() - nNow;
124 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
125 queue.erase(start_it, queue.end());
126 // Check whether we need to do work at all
127 do_work = !m_result.has_value();
128 }
129 // execute work
130 if (do_work) {
131 for (T& check : vChecks) {
132 local_result = check();
133 if (local_result.has_value()) break;
134 }
135 }
136 vChecks.clear();
137 } while (true);
138 }
139
140public:
143
145 explicit CCheckQueue(unsigned int batch_size, int worker_threads_num)
146 : nBatchSize(batch_size)
147 {
148 LogInfo("Script verification uses %d additional threads", worker_threads_num);
149 m_worker_threads.reserve(worker_threads_num);
150 for (int n = 0; n < worker_threads_num; ++n) {
151 m_worker_threads.emplace_back([this, n]() {
152 util::ThreadRename(strprintf("scriptch.%i", n));
153 Loop(false /* worker thread */);
154 });
155 }
156 }
157
158 // Since this class manages its own resources, which is a thread
159 // pool `m_worker_threads`, copy and move operations are not appropriate.
160 CCheckQueue(const CCheckQueue&) = delete;
164
167 std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
168 {
169 return Loop(true /* master thread */);
170 }
171
173 void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
174 {
175 if (vChecks.empty()) {
176 return;
177 }
178
179 {
180 LOCK(m_mutex);
181 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
182 nTodo += vChecks.size();
183 }
184
185 if (vChecks.size() == 1) {
186 m_worker_cv.notify_one();
187 } else {
188 m_worker_cv.notify_all();
189 }
190 }
191
193 {
194 WITH_LOCK(m_mutex, m_request_stop = true);
195 m_worker_cv.notify_all();
196 for (std::thread& t : m_worker_threads) {
197 t.join();
198 }
199 }
200
201 bool HasThreads() const { return !m_worker_threads.empty(); }
202};
203
208template <typename T, typename R = std::remove_cvref_t<decltype(std::declval<T>()().value())>>
210{
211private:
214 bool fDone;
215
216public:
220 explicit CCheckQueueControl(CCheckQueue<T>& queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex) : m_queue(queueIn), m_lock(LOCK_ARGS(queueIn.m_control_mutex)), fDone(false) {}
221
222 std::optional<R> Complete()
223 {
224 auto ret = m_queue.Complete();
225 fDone = true;
226 return ret;
227 }
228
229 void Add(std::vector<T>&& vChecks)
230 {
231 m_queue.Add(std::move(vChecks));
232 }
233
235 {
236 if (!fDone)
237 Complete();
238 }
239};
240
241#endif // BITCOIN_CHECKQUEUE_H
int ret
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:210
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
CCheckQueueControl(const CCheckQueueControl &)=delete
CCheckQueueControl()=delete
~CCheckQueueControl() UNLOCK_FUNCTION()
Definition: checkqueue.h:234
UniqueLock< Mutex > m_lock
Definition: checkqueue.h:213
CCheckQueue< T, R > & m_queue
Definition: checkqueue.h:212
CCheckQueueControl(CCheckQueue< T > &queueIn) EXCLUSIVE_LOCK_FUNCTION(queueIn.m_control_mutex)
Definition: checkqueue.h:220
std::optional< R > Complete()
Definition: checkqueue.h:222
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:229
Queue for verifications that have to be performed.
Definition: checkqueue.h:34
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:69
std::optional< R > Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Join the execution until completion.
Definition: checkqueue.h:167
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:63
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:142
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
CCheckQueue(const CCheckQueue &)=delete
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:173
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:50
CCheckQueue & operator=(CCheckQueue &&)=delete
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:53
CCheckQueue(unsigned int batch_size, int worker_threads_num)
Create a new check queue.
Definition: checkqueue.h:145
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:37
bool HasThreads() const
Definition: checkqueue.h:201
std::optional< R > Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: checkqueue.h:73
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:40
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:68
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:66
std::optional< R > m_result GUARDED_BY(m_mutex)
The temporary evaluation result.
CCheckQueue(CCheckQueue &&)=delete
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:43
CCheckQueue & operator=(const CCheckQueue &)=delete
Wrapper around std::unique_lock style lock for MutexType.
Definition: sync.h:154
#define T(expected, seed, data)
#define LogInfo(...)
Definition: log.h:125
T check(T ptr)
void ThreadRename(const std::string &)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:54
#define WAIT_LOCK(cs, name)
Definition: sync.h:274
#define LOCK_ARGS(cs)
Definition: sync.h:272
#define LOCK(cs)
Definition: sync.h:268
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:299
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
#define EXCLUSIVE_LOCK_FUNCTION(...)
Definition: threadsafety.h:41
#define SCOPED_LOCKABLE
Definition: threadsafety.h:36
#define UNLOCK_FUNCTION(...)
Definition: threadsafety.h:45
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172