Bitcoin Core  22.99.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 #include <tinyformat.h>
10 #include <util/syscall_sandbox.h>
11 #include <util/threadnames.h>
12 
13 #include <algorithm>
14 #include <vector>
15 
16 template <typename T>
18 
29 template <typename T>
31 {
32 private:
35 
37  std::condition_variable m_worker_cv;
38 
40  std::condition_variable m_master_cv;
41 
44  std::vector<T> queue GUARDED_BY(m_mutex);
45 
47  int nIdle GUARDED_BY(m_mutex){0};
48 
50  int nTotal GUARDED_BY(m_mutex){0};
51 
53  bool fAllOk GUARDED_BY(m_mutex){true};
54 
60  unsigned int nTodo GUARDED_BY(m_mutex){0};
61 
63  const unsigned int nBatchSize;
64 
65  std::vector<std::thread> m_worker_threads;
66  bool m_request_stop GUARDED_BY(m_mutex){false};
67 
69  bool Loop(bool fMaster)
70  {
71  std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
72  std::vector<T> vChecks;
73  vChecks.reserve(nBatchSize);
74  unsigned int nNow = 0;
75  bool fOk = true;
76  do {
77  {
78  WAIT_LOCK(m_mutex, lock);
79  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
80  if (nNow) {
81  fAllOk &= fOk;
82  nTodo -= nNow;
83  if (nTodo == 0 && !fMaster)
84  // We processed the last element; inform the master it can exit and return the result
85  m_master_cv.notify_one();
86  } else {
87  // first iteration
88  nTotal++;
89  }
90  // logically, the do loop starts here
91  while (queue.empty() && !m_request_stop) {
92  if (fMaster && nTodo == 0) {
93  nTotal--;
94  bool fRet = fAllOk;
95  // reset the status for new work later
96  fAllOk = true;
97  // return the current status
98  return fRet;
99  }
100  nIdle++;
101  cond.wait(lock); // wait
102  nIdle--;
103  }
104  if (m_request_stop) {
105  return false;
106  }
107 
108  // Decide how many work units to process now.
109  // * Do not try to do everything at once, but aim for increasingly smaller batches so
110  // all workers finish approximately simultaneously.
111  // * Try to account for idle jobs which will instantly start helping.
112  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
113  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
114  vChecks.resize(nNow);
115  for (unsigned int i = 0; i < nNow; i++) {
116  // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
117  // queue to the local batch vector instead of copying.
118  vChecks[i].swap(queue.back());
119  queue.pop_back();
120  }
121  // Check whether we need to do work at all
122  fOk = fAllOk;
123  }
124  // execute work
125  for (T& check : vChecks)
126  if (fOk)
127  fOk = check();
128  vChecks.clear();
129  } while (true);
130  }
131 
132 public:
135 
137  explicit CCheckQueue(unsigned int nBatchSizeIn)
138  : nBatchSize(nBatchSizeIn)
139  {
140  }
141 
143  void StartWorkerThreads(const int threads_num)
144  {
145  {
146  LOCK(m_mutex);
147  nIdle = 0;
148  nTotal = 0;
149  fAllOk = true;
150  }
151  assert(m_worker_threads.empty());
152  for (int n = 0; n < threads_num; ++n) {
153  m_worker_threads.emplace_back([this, n]() {
154  util::ThreadRename(strprintf("scriptch.%i", n));
156  Loop(false /* worker thread */);
157  });
158  }
159  }
160 
162  bool Wait()
163  {
164  return Loop(true /* master thread */);
165  }
166 
168  void Add(std::vector<T>& vChecks)
169  {
170  LOCK(m_mutex);
171  for (T& check : vChecks) {
172  queue.push_back(T());
173  check.swap(queue.back());
174  }
175  nTodo += vChecks.size();
176  if (vChecks.size() == 1)
177  m_worker_cv.notify_one();
178  else if (vChecks.size() > 1)
179  m_worker_cv.notify_all();
180  }
181 
184  {
185  WITH_LOCK(m_mutex, m_request_stop = true);
186  m_worker_cv.notify_all();
187  for (std::thread& t : m_worker_threads) {
188  t.join();
189  }
190  m_worker_threads.clear();
191  WITH_LOCK(m_mutex, m_request_stop = false);
192  }
193 
195  {
196  assert(m_worker_threads.empty());
197  }
198 
199 };
200 
205 template <typename T>
206 class CCheckQueueControl
207 {
208 private:
210  bool fDone;
211 
212 public:
213  CCheckQueueControl() = delete;
214  CCheckQueueControl(const CCheckQueueControl&) = delete;
216  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
217  {
218  // passed queue is supposed to be unused, or nullptr
219  if (pqueue != nullptr) {
220  ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
221  }
222  }
223 
224  bool Wait()
225  {
226  if (pqueue == nullptr)
227  return true;
228  bool fRet = pqueue->Wait();
229  fDone = true;
230  return fRet;
231  }
232 
233  void Add(std::vector<T>& vChecks)
234  {
235  if (pqueue != nullptr)
236  pqueue->Add(vChecks);
237  }
238 
240  {
241  if (!fDone)
242  Wait();
243  if (pqueue != nullptr) {
244  LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
245  }
246  }
247 };
248 
249 #endif // BITCOIN_CHECKQUEUE_H
assert
assert(!tx.IsCoinBase())
CCheckQueue::StopWorkerThreads
void StopWorkerThreads()
Stop all of the worker threads.
Definition: checkqueue.h:183
CCheckQueue::nBatchSize
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:60
CCheckQueue::~CCheckQueue
~CCheckQueue()
Definition: checkqueue.h:194
CCheckQueue::GUARDED_BY
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:50
sync.h
CCheckQueue::Loop
bool Loop(bool fMaster)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:69
CCheckQueue::StartWorkerThreads
void StartWorkerThreads(const int threads_num)
Create a pool of new worker threads.
Definition: checkqueue.h:143
CCheckQueue::GUARDED_BY
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
CCheckQueueControl::operator=
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
WITH_LOCK
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:270
CCheckQueue::CCheckQueue
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:137
AnnotatedMixin< std::mutex >
SyscallSandboxPolicy::VALIDATION_SCRIPT_CHECK
@ VALIDATION_SCRIPT_CHECK
tinyformat.h
CCheckQueueControl::Add
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:233
ENTER_CRITICAL_SECTION
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:233
syscall_sandbox.h
CCheckQueue::m_mutex
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:34
CCheckQueueControl::pqueue
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:209
CCheckQueue::GUARDED_BY
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:53
CCheckQueue
Queue for verifications that have to be performed.
Definition: checkqueue.h:30
CCheckQueue::GUARDED_BY
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:47
util::ThreadRename
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:57
SetSyscallSandboxPolicy
void SetSyscallSandboxPolicy(SyscallSandboxPolicy syscall_policy)
Force the current thread (and threads created from the current thread) into a restricted-service oper...
Definition: syscall_sandbox.cpp:826
CCheckQueueControl
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:17
CCheckQueue::GUARDED_BY
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:66
CCheckQueue::m_master_cv
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:40
CCheckQueueControl::CCheckQueueControl
CCheckQueueControl()=delete
strprintf
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1164
CCheckQueue::m_control_mutex
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:134
CCheckQueueControl::Wait
bool Wait()
Definition: checkqueue.h:224
LOCK
#define LOCK(cs)
Definition: sync.h:226
CCheckQueue::m_worker_threads
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:65
LEAVE_CRITICAL_SECTION
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:239
CCheckQueueControl::fDone
bool fDone
Definition: checkqueue.h:210
CCheckQueueControl::CCheckQueueControl
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:216
T
#define T(expected, seed, data)
CCheckQueue::Add
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:168
threadnames.h
WAIT_LOCK
#define WAIT_LOCK(cs, name)
Definition: sync.h:231
CCheckQueue::m_worker_cv
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:37
CCheckQueueControl::~CCheckQueueControl
~CCheckQueueControl()
Definition: checkqueue.h:239
CCheckQueue::Wait
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:162