Bitcoin Core  22.99.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/threadnames.h>
11 
12 #include <algorithm>
13 #include <vector>
14 
15 template <typename T>
17 
28 template <typename T>
30 {
31 private:
34 
36  std::condition_variable m_worker_cv;
37 
39  std::condition_variable m_master_cv;
40 
43  std::vector<T> queue GUARDED_BY(m_mutex);
44 
46  int nIdle GUARDED_BY(m_mutex){0};
47 
49  int nTotal GUARDED_BY(m_mutex){0};
50 
52  bool fAllOk GUARDED_BY(m_mutex){true};
53 
59  unsigned int nTodo GUARDED_BY(m_mutex){0};
60 
62  const unsigned int nBatchSize;
63 
64  std::vector<std::thread> m_worker_threads;
65  bool m_request_stop GUARDED_BY(m_mutex){false};
66 
68  bool Loop(bool fMaster)
69  {
70  std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
71  std::vector<T> vChecks;
72  vChecks.reserve(nBatchSize);
73  unsigned int nNow = 0;
74  bool fOk = true;
75  do {
76  {
77  WAIT_LOCK(m_mutex, lock);
78  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
79  if (nNow) {
80  fAllOk &= fOk;
81  nTodo -= nNow;
82  if (nTodo == 0 && !fMaster)
83  // We processed the last element; inform the master it can exit and return the result
84  m_master_cv.notify_one();
85  } else {
86  // first iteration
87  nTotal++;
88  }
89  // logically, the do loop starts here
90  while (queue.empty() && !m_request_stop) {
91  if (fMaster && nTodo == 0) {
92  nTotal--;
93  bool fRet = fAllOk;
94  // reset the status for new work later
95  fAllOk = true;
96  // return the current status
97  return fRet;
98  }
99  nIdle++;
100  cond.wait(lock); // wait
101  nIdle--;
102  }
103  if (m_request_stop) {
104  return false;
105  }
106 
107  // Decide how many work units to process now.
108  // * Do not try to do everything at once, but aim for increasingly smaller batches so
109  // all workers finish approximately simultaneously.
110  // * Try to account for idle jobs which will instantly start helping.
111  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
112  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
113  vChecks.resize(nNow);
114  for (unsigned int i = 0; i < nNow; i++) {
115  // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
116  // queue to the local batch vector instead of copying.
117  vChecks[i].swap(queue.back());
118  queue.pop_back();
119  }
120  // Check whether we need to do work at all
121  fOk = fAllOk;
122  }
123  // execute work
124  for (T& check : vChecks)
125  if (fOk)
126  fOk = check();
127  vChecks.clear();
128  } while (true);
129  }
130 
131 public:
134 
136  explicit CCheckQueue(unsigned int nBatchSizeIn)
137  : nBatchSize(nBatchSizeIn)
138  {
139  }
140 
142  void StartWorkerThreads(const int threads_num)
143  {
144  {
145  LOCK(m_mutex);
146  nIdle = 0;
147  nTotal = 0;
148  fAllOk = true;
149  }
150  assert(m_worker_threads.empty());
151  for (int n = 0; n < threads_num; ++n) {
152  m_worker_threads.emplace_back([this, n]() {
153  util::ThreadRename(strprintf("scriptch.%i", n));
154  Loop(false /* worker thread */);
155  });
156  }
157  }
158 
160  bool Wait()
161  {
162  return Loop(true /* master thread */);
163  }
164 
166  void Add(std::vector<T>& vChecks)
167  {
168  LOCK(m_mutex);
169  for (T& check : vChecks) {
170  queue.push_back(T());
171  check.swap(queue.back());
172  }
173  nTodo += vChecks.size();
174  if (vChecks.size() == 1)
175  m_worker_cv.notify_one();
176  else if (vChecks.size() > 1)
177  m_worker_cv.notify_all();
178  }
179 
182  {
183  WITH_LOCK(m_mutex, m_request_stop = true);
184  m_worker_cv.notify_all();
185  for (std::thread& t : m_worker_threads) {
186  t.join();
187  }
188  m_worker_threads.clear();
189  WITH_LOCK(m_mutex, m_request_stop = false);
190  }
191 
193  {
194  assert(m_worker_threads.empty());
195  }
196 
197 };
198 
203 template <typename T>
204 class CCheckQueueControl
205 {
206 private:
208  bool fDone;
209 
210 public:
211  CCheckQueueControl() = delete;
212  CCheckQueueControl(const CCheckQueueControl&) = delete;
214  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
215  {
216  // passed queue is supposed to be unused, or nullptr
217  if (pqueue != nullptr) {
218  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
219  }
220  }
221 
222  bool Wait()
223  {
224  if (pqueue == nullptr)
225  return true;
226  bool fRet = pqueue->Wait();
227  fDone = true;
228  return fRet;
229  }
230 
231  void Add(std::vector<T>& vChecks)
232  {
233  if (pqueue != nullptr)
234  pqueue->Add(vChecks);
235  }
236 
238  {
239  if (!fDone)
240  Wait();
241  if (pqueue != nullptr) {
242  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
243  }
244  }
245 };
246 
247 #endif // BITCOIN_CHECKQUEUE_H
assert
assert(!tx.IsCoinBase())
CCheckQueue::StopWorkerThreads
void StopWorkerThreads()
Stop all of the worker threads.
Definition: checkqueue.h:181
CCheckQueue::nBatchSize
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:59
CCheckQueue::~CCheckQueue
~CCheckQueue()
Definition: checkqueue.h:192
CCheckQueue::GUARDED_BY
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:49
sync.h
CCheckQueue::Loop
bool Loop(bool fMaster)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:68
CCheckQueue::StartWorkerThreads
void StartWorkerThreads(const int threads_num)
Create a pool of new worker threads.
Definition: checkqueue.h:142
CCheckQueue::GUARDED_BY
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
CCheckQueueControl::operator=
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
WITH_LOCK
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:276
CCheckQueue::CCheckQueue
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:136
AnnotatedMixin< std::mutex >
tinyformat.h
CCheckQueueControl::Add
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:231
ENTER_CRITICAL_SECTION
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:239
CCheckQueue::m_mutex
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:33
CCheckQueueControl::pqueue
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:207
CCheckQueue::GUARDED_BY
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:52
CCheckQueue
Queue for verifications that have to be performed.
Definition: checkqueue.h:29
CCheckQueue::GUARDED_BY
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:46
util::ThreadRename
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:57
CCheckQueueControl
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:16
CCheckQueue::GUARDED_BY
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:65
CCheckQueue::m_master_cv
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:39
CCheckQueueControl::CCheckQueueControl
CCheckQueueControl()=delete
strprintf
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1164
CCheckQueue::m_control_mutex
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:133
CCheckQueueControl::Wait
bool Wait()
Definition: checkqueue.h:222
LOCK
#define LOCK(cs)
Definition: sync.h:232
CCheckQueue::m_worker_threads
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:64
LEAVE_CRITICAL_SECTION
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:245
CCheckQueueControl::fDone
bool fDone
Definition: checkqueue.h:208
CCheckQueueControl::CCheckQueueControl
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:214
CCheckQueue::Add
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:166
threadnames.h
WAIT_LOCK
#define WAIT_LOCK(cs, name)
Definition: sync.h:237
CCheckQueue::m_worker_cv
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:36
CCheckQueueControl::~CCheckQueueControl
~CCheckQueueControl()
Definition: checkqueue.h:237
CCheckQueue::Wait
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:160