Bitcoin Core  0.19.99
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2019 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 
9 #include <assert.h>
10 #include <utility>
11 
13 {
14 }
15 
17 {
18  assert(nThreadsServicingQueue == 0);
19  if (stopWhenEmpty) assert(taskQueue.empty());
20 }
21 
22 
24 {
25  WAIT_LOCK(newTaskMutex, lock);
26  ++nThreadsServicingQueue;
27 
28  // newTaskMutex is locked throughout this loop EXCEPT
29  // when the thread is waiting or when the user's function
30  // is called.
31  while (!shouldStop()) {
32  try {
33  if (!shouldStop() && taskQueue.empty()) {
34  REVERSE_LOCK(lock);
35  }
36  while (!shouldStop() && taskQueue.empty()) {
37  // Wait until there is something to do.
38  newTaskScheduled.wait(lock);
39  }
40 
41  // Wait until either there is a new task, or until
42  // the time of the first item on the queue:
43 
44  while (!shouldStop() && !taskQueue.empty()) {
45  std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
46  if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
47  break; // Exit loop after timeout, it means we reached the time of the event
48  }
49  }
50 
51  // If there are multiple threads, the queue can empty while we're waiting (another
52  // thread may service the task we were waiting on).
53  if (shouldStop() || taskQueue.empty())
54  continue;
55 
56  Function f = taskQueue.begin()->second;
57  taskQueue.erase(taskQueue.begin());
58 
59  {
60  // Unlock before calling f, so it can reschedule itself or another task
61  // without deadlocking:
62  REVERSE_LOCK(lock);
63  f();
64  }
65  } catch (...) {
66  --nThreadsServicingQueue;
67  throw;
68  }
69  }
70  --nThreadsServicingQueue;
71  newTaskScheduled.notify_one();
72 }
73 
74 void CScheduler::stop(bool drain)
75 {
76  {
78  if (drain)
79  stopWhenEmpty = true;
80  else
81  stopRequested = true;
82  }
83  newTaskScheduled.notify_all();
84 }
85 
87 {
88  {
90  taskQueue.insert(std::make_pair(t, f));
91  }
92  newTaskScheduled.notify_one();
93 }
94 
95 void CScheduler::MockForward(std::chrono::seconds delta_seconds)
96 {
97  assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
98 
99  {
101 
102  // use temp_queue to maintain updated schedule
103  std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
104 
105  for (const auto& element : taskQueue) {
106  temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
107  }
108 
109  // point taskQueue to temp_queue
110  taskQueue = std::move(temp_queue);
111  }
112 
113  // notify that the taskQueue needs to be processed
114  newTaskScheduled.notify_one();
115 }
116 
117 static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
118 {
119  f();
120  s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
121 }
122 
123 void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
124 {
125  scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
126 }
127 
130 {
132  size_t result = taskQueue.size();
133  if (!taskQueue.empty()) {
134  first = taskQueue.begin()->first;
135  last = taskQueue.rbegin()->first;
136  }
137  return result;
138 }
139 
142  return nThreadsServicingQueue;
143 }
144 
145 
147  {
148  LOCK(m_cs_callbacks_pending);
149  // Try to avoid scheduling too many copies here, but if we
150  // accidentally have two ProcessQueue's scheduled at once its
151  // not a big deal.
152  if (m_are_callbacks_running) return;
153  if (m_callbacks_pending.empty()) return;
154  }
155  m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
156 }
157 
159  std::function<void ()> callback;
160  {
161  LOCK(m_cs_callbacks_pending);
162  if (m_are_callbacks_running) return;
163  if (m_callbacks_pending.empty()) return;
164  m_are_callbacks_running = true;
165 
166  callback = std::move(m_callbacks_pending.front());
167  m_callbacks_pending.pop_front();
168  }
169 
170  // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
171  // to ensure both happen safely even if callback() throws.
172  struct RAIICallbacksRunning {
174  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
175  ~RAIICallbacksRunning() {
176  {
177  LOCK(instance->m_cs_callbacks_pending);
178  instance->m_are_callbacks_running = false;
179  }
180  instance->MaybeScheduleProcessQueue();
181  }
182  } raiicallbacksrunning(this);
183 
184  callback();
185 }
186 
187 void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void ()> func) {
188  assert(m_pscheduler);
189 
190  {
191  LOCK(m_cs_callbacks_pending);
192  m_callbacks_pending.emplace_back(std::move(func));
193  }
194  MaybeScheduleProcessQueue();
195 }
196 
198  assert(!m_pscheduler->AreThreadsServicingQueue());
199  bool should_continue = true;
200  while (should_continue) {
201  ProcessQueue();
202  LOCK(m_cs_callbacks_pending);
203  should_continue = !m_callbacks_pending.empty();
204  }
205 }
206 
208  LOCK(m_cs_callbacks_pending);
209  return m_callbacks_pending.size();
210 }
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:109
void scheduleEvery(Function f, std::chrono::milliseconds delta)
Repeat f until the scheduler is stopped.
Definition: scheduler.cpp:123
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
Definition: scheduler.cpp:117
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:95
std::function< void()> Function
Definition: scheduler.h:44
Mutex newTaskMutex
Definition: scheduler.h:90
#define REVERSE_LOCK(g)
Definition: sync.h:213
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:187
std::condition_variable newTaskScheduled
Definition: scheduler.h:91
void stop(bool drain=false)
Definition: scheduler.cpp:74
#define LOCK(cs)
Definition: sync.h:218
void schedule(Function f, std::chrono::system_clock::time_point t)
Definition: scheduler.cpp:86
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:113
void serviceQueue()
Definition: scheduler.cpp:23
#define WAIT_LOCK(cs, name)
Definition: sync.h:223
bool AreThreadsServicingQueue() const
Definition: scheduler.cpp:140
clock::time_point time_point
Definition: bench.h:51
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Definition: scheduler.cpp:128
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:50
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:96