Bitcoin Core  0.20.99
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 
9 #include <assert.h>
10 #include <utility>
11 
13 {
14 }
15 
17 {
18  assert(nThreadsServicingQueue == 0);
19  if (stopWhenEmpty) assert(taskQueue.empty());
20 }
21 
22 
24 {
25  WAIT_LOCK(newTaskMutex, lock);
26  ++nThreadsServicingQueue;
27 
28  // newTaskMutex is locked throughout this loop EXCEPT
29  // when the thread is waiting or when the user's function
30  // is called.
31  while (!shouldStop()) {
32  try {
33  while (!shouldStop() && taskQueue.empty()) {
34  // Wait until there is something to do.
35  newTaskScheduled.wait(lock);
36  }
37 
38  // Wait until either there is a new task, or until
39  // the time of the first item on the queue:
40 
41  while (!shouldStop() && !taskQueue.empty()) {
42  std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
43  if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
44  break; // Exit loop after timeout, it means we reached the time of the event
45  }
46  }
47 
48  // If there are multiple threads, the queue can empty while we're waiting (another
49  // thread may service the task we were waiting on).
50  if (shouldStop() || taskQueue.empty())
51  continue;
52 
53  Function f = taskQueue.begin()->second;
54  taskQueue.erase(taskQueue.begin());
55 
56  {
57  // Unlock before calling f, so it can reschedule itself or another task
58  // without deadlocking:
59  REVERSE_LOCK(lock);
60  f();
61  }
62  } catch (...) {
63  --nThreadsServicingQueue;
64  throw;
65  }
66  }
67  --nThreadsServicingQueue;
68  newTaskScheduled.notify_one();
69 }
70 
71 void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
72 {
73  {
75  taskQueue.insert(std::make_pair(t, f));
76  }
77  newTaskScheduled.notify_one();
78 }
79 
80 void CScheduler::MockForward(std::chrono::seconds delta_seconds)
81 {
82  assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
83 
84  {
86 
87  // use temp_queue to maintain updated schedule
88  std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
89 
90  for (const auto& element : taskQueue) {
91  temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
92  }
93 
94  // point taskQueue to temp_queue
95  taskQueue = std::move(temp_queue);
96  }
97 
98  // notify that the taskQueue needs to be processed
99  newTaskScheduled.notify_one();
100 }
101 
102 static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
103 {
104  f();
105  s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
106 }
107 
108 void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
109 {
110  scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
111 }
112 
113 size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
114  std::chrono::system_clock::time_point& last) const
115 {
117  size_t result = taskQueue.size();
118  if (!taskQueue.empty()) {
119  first = taskQueue.begin()->first;
120  last = taskQueue.rbegin()->first;
121  }
122  return result;
123 }
124 
126 {
128  return nThreadsServicingQueue;
129 }
130 
131 
133 {
134  {
135  LOCK(m_cs_callbacks_pending);
136  // Try to avoid scheduling too many copies here, but if we
137  // accidentally have two ProcessQueue's scheduled at once its
138  // not a big deal.
139  if (m_are_callbacks_running) return;
140  if (m_callbacks_pending.empty()) return;
141  }
142  m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
143 }
144 
146 {
147  std::function<void()> callback;
148  {
149  LOCK(m_cs_callbacks_pending);
150  if (m_are_callbacks_running) return;
151  if (m_callbacks_pending.empty()) return;
152  m_are_callbacks_running = true;
153 
154  callback = std::move(m_callbacks_pending.front());
155  m_callbacks_pending.pop_front();
156  }
157 
158  // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
159  // to ensure both happen safely even if callback() throws.
160  struct RAIICallbacksRunning {
162  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
163  ~RAIICallbacksRunning()
164  {
165  {
166  LOCK(instance->m_cs_callbacks_pending);
167  instance->m_are_callbacks_running = false;
168  }
169  instance->MaybeScheduleProcessQueue();
170  }
171  } raiicallbacksrunning(this);
172 
173  callback();
174 }
175 
176 void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
177 {
178  assert(m_pscheduler);
179 
180  {
181  LOCK(m_cs_callbacks_pending);
182  m_callbacks_pending.emplace_back(std::move(func));
183  }
184  MaybeScheduleProcessQueue();
185 }
186 
188 {
189  assert(!m_pscheduler->AreThreadsServicingQueue());
190  bool should_continue = true;
191  while (should_continue) {
192  ProcessQueue();
193  LOCK(m_cs_callbacks_pending);
194  should_continue = !m_callbacks_pending.empty();
195  }
196 }
197 
199 {
200  LOCK(m_cs_callbacks_pending);
201  return m_callbacks_pending.size();
202 }
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:113
const std::chrono::seconds now
void scheduleEvery(Function f, std::chrono::milliseconds delta)
Repeat f until the scheduler is stopped.
Definition: scheduler.cpp:108
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
Definition: scheduler.cpp:102
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:80
std::function< void()> Function
Definition: scheduler.h:38
Mutex newTaskMutex
Definition: scheduler.h:94
#define REVERSE_LOCK(g)
Definition: sync.h:225
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:176
std::condition_variable newTaskScheduled
Definition: scheduler.h:95
#define LOCK(cs)
Definition: sync.h:230
void schedule(Function f, std::chrono::system_clock::time_point t)
Call func at/after time t.
Definition: scheduler.cpp:71
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:118
void serviceQueue()
Services the queue &#39;forever&#39;.
Definition: scheduler.cpp:23
#define WAIT_LOCK(cs, name)
Definition: sync.h:235
bool AreThreadsServicingQueue() const
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:125
Simple class for background tasks that should be run periodically or once "after a while"...
Definition: scheduler.h:32
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:113
void EmptyQueue()
Processes all remaining queue members on the calling thread, blocking until queue is empty Must be ca...
Definition: scheduler.cpp:187
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:44
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:100