Bitcoin Core  21.99.0
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 
9 #include <assert.h>
10 #include <functional>
11 #include <utility>
12 
14 {
15 }
16 
18 {
19  assert(nThreadsServicingQueue == 0);
20  if (stopWhenEmpty) assert(taskQueue.empty());
21 }
22 
23 
25 {
26  WAIT_LOCK(newTaskMutex, lock);
27  ++nThreadsServicingQueue;
28 
29  // newTaskMutex is locked throughout this loop EXCEPT
30  // when the thread is waiting or when the user's function
31  // is called.
32  while (!shouldStop()) {
33  try {
34  while (!shouldStop() && taskQueue.empty()) {
35  // Wait until there is something to do.
36  newTaskScheduled.wait(lock);
37  }
38 
39  // Wait until either there is a new task, or until
40  // the time of the first item on the queue:
41 
42  while (!shouldStop() && !taskQueue.empty()) {
43  std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
44  if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
45  break; // Exit loop after timeout, it means we reached the time of the event
46  }
47  }
48 
49  // If there are multiple threads, the queue can empty while we're waiting (another
50  // thread may service the task we were waiting on).
51  if (shouldStop() || taskQueue.empty())
52  continue;
53 
54  Function f = taskQueue.begin()->second;
55  taskQueue.erase(taskQueue.begin());
56 
57  {
58  // Unlock before calling f, so it can reschedule itself or another task
59  // without deadlocking:
60  REVERSE_LOCK(lock);
61  f();
62  }
63  } catch (...) {
64  --nThreadsServicingQueue;
65  throw;
66  }
67  }
68  --nThreadsServicingQueue;
69  newTaskScheduled.notify_one();
70 }
71 
72 void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
73 {
74  {
76  taskQueue.insert(std::make_pair(t, f));
77  }
78  newTaskScheduled.notify_one();
79 }
80 
81 void CScheduler::MockForward(std::chrono::seconds delta_seconds)
82 {
83  assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
84 
85  {
87 
88  // use temp_queue to maintain updated schedule
89  std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
90 
91  for (const auto& element : taskQueue) {
92  temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
93  }
94 
95  // point taskQueue to temp_queue
96  taskQueue = std::move(temp_queue);
97  }
98 
99  // notify that the taskQueue needs to be processed
100  newTaskScheduled.notify_one();
101 }
102 
103 static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
104 {
105  f();
106  s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
107 }
108 
109 void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
110 {
111  scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
112 }
113 
114 size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
115  std::chrono::system_clock::time_point& last) const
116 {
118  size_t result = taskQueue.size();
119  if (!taskQueue.empty()) {
120  first = taskQueue.begin()->first;
121  last = taskQueue.rbegin()->first;
122  }
123  return result;
124 }
125 
127 {
129  return nThreadsServicingQueue;
130 }
131 
132 
134 {
135  {
137  // Try to avoid scheduling too many copies here, but if we
138  // accidentally have two ProcessQueue's scheduled at once its
139  // not a big deal.
140  if (m_are_callbacks_running) return;
141  if (m_callbacks_pending.empty()) return;
142  }
143  m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
144 }
145 
147 {
148  std::function<void()> callback;
149  {
151  if (m_are_callbacks_running) return;
152  if (m_callbacks_pending.empty()) return;
153  m_are_callbacks_running = true;
154 
155  callback = std::move(m_callbacks_pending.front());
156  m_callbacks_pending.pop_front();
157  }
158 
159  // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
160  // to ensure both happen safely even if callback() throws.
161  struct RAIICallbacksRunning {
163  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
164  ~RAIICallbacksRunning()
165  {
166  {
167  LOCK(instance->m_cs_callbacks_pending);
168  instance->m_are_callbacks_running = false;
169  }
170  instance->MaybeScheduleProcessQueue();
171  }
172  } raiicallbacksrunning(this);
173 
174  callback();
175 }
176 
177 void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
178 {
180 
181  {
183  m_callbacks_pending.emplace_back(std::move(func));
184  }
186 }
187 
189 {
191  bool should_continue = true;
192  while (should_continue) {
193  ProcessQueue();
195  should_continue = !m_callbacks_pending.empty();
196  }
197 }
198 
200 {
202  return m_callbacks_pending.size();
203 }
CScheduler::newTaskScheduled
std::condition_variable newTaskScheduled
Definition: scheduler.h:99
CScheduler
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:33
CScheduler::MockForward
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:81
SingleThreadedSchedulerClient::m_pscheduler
CScheduler * m_pscheduler
Definition: scheduler.h:120
CScheduler::newTaskMutex
Mutex newTaskMutex
Definition: scheduler.h:98
REVERSE_LOCK
#define REVERSE_LOCK(g)
Definition: sync.h:227
CScheduler::CScheduler
CScheduler()
Definition: scheduler.cpp:13
SingleThreadedSchedulerClient::CallbacksPending
size_t CallbacksPending()
Definition: scheduler.cpp:199
SingleThreadedSchedulerClient::ProcessQueue
void ProcessQueue()
Definition: scheduler.cpp:146
CScheduler::~CScheduler
~CScheduler()
Definition: scheduler.cpp:17
scheduler.h
CScheduler::serviceQueue
void serviceQueue()
Services the queue 'forever'.
Definition: scheduler.cpp:24
CScheduler::getQueueInfo
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:114
random.h
CScheduler::Function
std::function< void()> Function
Definition: scheduler.h:41
CScheduler::scheduleEvery
void scheduleEvery(Function f, std::chrono::milliseconds delta)
Repeat f until the scheduler is stopped.
Definition: scheduler.cpp:109
Repeat
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
Definition: scheduler.cpp:103
SingleThreadedSchedulerClient::m_cs_callbacks_pending
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:122
SingleThreadedSchedulerClient::EmptyQueue
void EmptyQueue()
Processes all remaining queue members on the calling thread, blocking until queue is empty Must be ca...
Definition: scheduler.cpp:188
SingleThreadedSchedulerClient::AddToProcessQueue
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:177
LOCK
#define LOCK(cs)
Definition: sync.h:232
CScheduler::shouldStop
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:104
SingleThreadedSchedulerClient::MaybeScheduleProcessQueue
void MaybeScheduleProcessQueue()
Definition: scheduler.cpp:133
CScheduler::scheduleFromNow
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:47
SingleThreadedSchedulerClient
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:117
CScheduler::AreThreadsServicingQueue
bool AreThreadsServicingQueue() const
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:126
CScheduler::schedule
void schedule(Function f, std::chrono::system_clock::time_point t)
Call func at/after time t.
Definition: scheduler.cpp:72
assert
assert(std::addressof(::ChainstateActive().CoinsTip())==std::addressof(coins_cache))
WAIT_LOCK
#define WAIT_LOCK(cs, name)
Definition: sync.h:237