Bitcoin Core  0.19.99
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 
10 #include <algorithm>
11 #include <vector>
12 
13 #include <boost/thread/condition_variable.hpp>
14 #include <boost/thread/mutex.hpp>
15 
16 template <typename T>
18 
29 template <typename T>
31 {
32 private:
34  boost::mutex mutex;
35 
37  boost::condition_variable condWorker;
38 
40  boost::condition_variable condMaster;
41 
44  std::vector<T> queue;
45 
47  int nIdle;
48 
50  int nTotal;
51 
53  bool fAllOk;
54 
60  unsigned int nTodo;
61 
63  unsigned int nBatchSize;
64 
66  bool Loop(bool fMaster = false)
67  {
68  boost::condition_variable& cond = fMaster ? condMaster : condWorker;
69  std::vector<T> vChecks;
70  vChecks.reserve(nBatchSize);
71  unsigned int nNow = 0;
72  bool fOk = true;
73  do {
74  {
75  boost::unique_lock<boost::mutex> lock(mutex);
76  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
77  if (nNow) {
78  fAllOk &= fOk;
79  nTodo -= nNow;
80  if (nTodo == 0 && !fMaster)
81  // We processed the last element; inform the master it can exit and return the result
82  condMaster.notify_one();
83  } else {
84  // first iteration
85  nTotal++;
86  }
87  // logically, the do loop starts here
88  while (queue.empty()) {
89  if (fMaster && nTodo == 0) {
90  nTotal--;
91  bool fRet = fAllOk;
92  // reset the status for new work later
93  fAllOk = true;
94  // return the current status
95  return fRet;
96  }
97  nIdle++;
98  cond.wait(lock); // wait
99  nIdle--;
100  }
101  // Decide how many work units to process now.
102  // * Do not try to do everything at once, but aim for increasingly smaller batches so
103  // all workers finish approximately simultaneously.
104  // * Try to account for idle jobs which will instantly start helping.
105  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
106  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
107  vChecks.resize(nNow);
108  for (unsigned int i = 0; i < nNow; i++) {
109  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
110  // queue to the local batch vector instead of copying.
111  vChecks[i].swap(queue.back());
112  queue.pop_back();
113  }
114  // Check whether we need to do work at all
115  fOk = fAllOk;
116  }
117  // execute work
118  for (T& check : vChecks)
119  if (fOk)
120  fOk = check();
121  vChecks.clear();
122  } while (true);
123  }
124 
125 public:
127  boost::mutex ControlMutex;
128 
130  explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {}
131 
133  void Thread()
134  {
135  Loop();
136  }
137 
139  bool Wait()
140  {
141  return Loop(true);
142  }
143 
145  void Add(std::vector<T>& vChecks)
146  {
147  boost::unique_lock<boost::mutex> lock(mutex);
148  for (T& check : vChecks) {
149  queue.push_back(T());
150  check.swap(queue.back());
151  }
152  nTodo += vChecks.size();
153  if (vChecks.size() == 1)
154  condWorker.notify_one();
155  else if (vChecks.size() > 1)
156  condWorker.notify_all();
157  }
158 
160  {
161  }
162 
163 };
164 
169 template <typename T>
170 class CCheckQueueControl
171 {
172 private:
174  bool fDone;
175 
176 public:
177  CCheckQueueControl() = delete;
178  CCheckQueueControl(const CCheckQueueControl&) = delete;
179  CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
180  explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
181  {
182  // passed queue is supposed to be unused, or nullptr
183  if (pqueue != nullptr) {
185  }
186  }
187 
188  bool Wait()
189  {
190  if (pqueue == nullptr)
191  return true;
192  bool fRet = pqueue->Wait();
193  fDone = true;
194  return fRet;
195  }
196 
197  void Add(std::vector<T>& vChecks)
198  {
199  if (pqueue != nullptr)
200  pqueue->Add(vChecks);
201  }
202 
204  {
205  if (!fDone)
206  Wait();
207  if (pqueue != nullptr) {
209  }
210  }
211 };
212 
213 #endif // BITCOIN_CHECKQUEUE_H
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:197
boost::condition_variable condWorker
Worker threads block on this when out of work.
Definition: checkqueue.h:37
boost::mutex mutex
Mutex to protect the inner state.
Definition: checkqueue.h:34
boost::condition_variable condMaster
Master thread blocks on this when out of work.
Definition: checkqueue.h:40
bool Loop(bool fMaster=false)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:66
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:180
void Thread()
Worker thread.
Definition: checkqueue.h:133
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:17
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:130
std::vector< T > queue
The queue of elements to be processed.
Definition: checkqueue.h:44
bool fAllOk
The temporary evaluation result.
Definition: checkqueue.h:53
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:192
int nTotal
The total number of workers (including the master).
Definition: checkqueue.h:50
Queue for verifications that have to be performed.
Definition: checkqueue.h:30
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:173
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:186
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:139
int nIdle
The number of workers (including the master) that are idle.
Definition: checkqueue.h:47
unsigned int nTodo
Number of verifications that haven&#39;t completed yet.
Definition: checkqueue.h:60
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:145
unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:63
boost::mutex ControlMutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:127