Bitcoin Core  22.99.0
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2021 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 #include <util/syscall_sandbox.h>
9 #include <util/time.h>
10 
11 #include <assert.h>
12 #include <functional>
13 #include <utility>
14 
16 {
17 }
18 
20 {
21  assert(nThreadsServicingQueue == 0);
22  if (stopWhenEmpty) assert(taskQueue.empty());
23 }
24 
25 
27 {
29  WAIT_LOCK(newTaskMutex, lock);
30  ++nThreadsServicingQueue;
31 
32  // newTaskMutex is locked throughout this loop EXCEPT
33  // when the thread is waiting or when the user's function
34  // is called.
35  while (!shouldStop()) {
36  try {
37  while (!shouldStop() && taskQueue.empty()) {
38  // Wait until there is something to do.
39  newTaskScheduled.wait(lock);
40  }
41 
42  // Wait until either there is a new task, or until
43  // the time of the first item on the queue:
44 
45  while (!shouldStop() && !taskQueue.empty()) {
46  std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
47  if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
48  break; // Exit loop after timeout, it means we reached the time of the event
49  }
50  }
51 
52  // If there are multiple threads, the queue can empty while we're waiting (another
53  // thread may service the task we were waiting on).
54  if (shouldStop() || taskQueue.empty())
55  continue;
56 
57  Function f = taskQueue.begin()->second;
58  taskQueue.erase(taskQueue.begin());
59 
60  {
61  // Unlock before calling f, so it can reschedule itself or another task
62  // without deadlocking:
63  REVERSE_LOCK(lock);
64  f();
65  }
66  } catch (...) {
67  --nThreadsServicingQueue;
68  throw;
69  }
70  }
71  --nThreadsServicingQueue;
72  newTaskScheduled.notify_one();
73 }
74 
75 void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
76 {
77  {
79  taskQueue.insert(std::make_pair(t, f));
80  }
81  newTaskScheduled.notify_one();
82 }
83 
84 void CScheduler::MockForward(std::chrono::seconds delta_seconds)
85 {
86  assert(delta_seconds > 0s && delta_seconds <= 1h);
87 
88  {
90 
91  // use temp_queue to maintain updated schedule
92  std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
93 
94  for (const auto& element : taskQueue) {
95  temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
96  }
97 
98  // point taskQueue to temp_queue
99  taskQueue = std::move(temp_queue);
100  }
101 
102  // notify that the taskQueue needs to be processed
103  newTaskScheduled.notify_one();
104 }
105 
106 static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
107 {
108  f();
109  s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
110 }
111 
112 void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
113 {
114  scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
115 }
116 
117 size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
118  std::chrono::system_clock::time_point& last) const
119 {
121  size_t result = taskQueue.size();
122  if (!taskQueue.empty()) {
123  first = taskQueue.begin()->first;
124  last = taskQueue.rbegin()->first;
125  }
126  return result;
127 }
128 
130 {
132  return nThreadsServicingQueue;
133 }
134 
135 
137 {
138  {
140  // Try to avoid scheduling too many copies here, but if we
141  // accidentally have two ProcessQueue's scheduled at once its
142  // not a big deal.
143  if (m_are_callbacks_running) return;
144  if (m_callbacks_pending.empty()) return;
145  }
146  m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
147 }
148 
150 {
151  std::function<void()> callback;
152  {
154  if (m_are_callbacks_running) return;
155  if (m_callbacks_pending.empty()) return;
156  m_are_callbacks_running = true;
157 
158  callback = std::move(m_callbacks_pending.front());
159  m_callbacks_pending.pop_front();
160  }
161 
162  // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
163  // to ensure both happen safely even if callback() throws.
164  struct RAIICallbacksRunning {
166  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
167  ~RAIICallbacksRunning()
168  {
169  {
170  LOCK(instance->m_callbacks_mutex);
171  instance->m_are_callbacks_running = false;
172  }
173  instance->MaybeScheduleProcessQueue();
174  }
175  } raiicallbacksrunning(this);
176 
177  callback();
178 }
179 
180 void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func)
181 {
183 
184  {
186  m_callbacks_pending.emplace_back(std::move(func));
187  }
189 }
190 
192 {
194  bool should_continue = true;
195  while (should_continue) {
196  ProcessQueue();
198  should_continue = !m_callbacks_pending.empty();
199  }
200 }
201 
203 {
205  return m_callbacks_pending.size();
206 }
CScheduler::newTaskScheduled
std::condition_variable newTaskScheduled
Definition: scheduler.h:99
CScheduler
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:33
CScheduler::MockForward
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:84
assert
assert(!tx.IsCoinBase())
SingleThreadedSchedulerClient::m_pscheduler
CScheduler * m_pscheduler
Definition: scheduler.h:120
CScheduler::newTaskMutex
Mutex newTaskMutex
Definition: scheduler.h:98
REVERSE_LOCK
#define REVERSE_LOCK(g)
Definition: sync.h:221
CScheduler::CScheduler
CScheduler()
Definition: scheduler.cpp:15
SingleThreadedSchedulerClient::CallbacksPending
size_t CallbacksPending()
Definition: scheduler.cpp:202
SingleThreadedSchedulerClient::ProcessQueue
void ProcessQueue()
Definition: scheduler.cpp:149
CScheduler::~CScheduler
~CScheduler()
Definition: scheduler.cpp:19
scheduler.h
CScheduler::serviceQueue
void serviceQueue()
Services the queue 'forever'.
Definition: scheduler.cpp:26
CScheduler::getQueueInfo
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:117
syscall_sandbox.h
random.h
CScheduler::Function
std::function< void()> Function
Definition: scheduler.h:41
time.h
CScheduler::scheduleEvery
void scheduleEvery(Function f, std::chrono::milliseconds delta)
Repeat f until the scheduler is stopped.
Definition: scheduler.cpp:112
Repeat
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
Definition: scheduler.cpp:106
SetSyscallSandboxPolicy
void SetSyscallSandboxPolicy(SyscallSandboxPolicy syscall_policy)
Force the current thread (and threads created from the current thread) into a restricted-service oper...
Definition: syscall_sandbox.cpp:830
SingleThreadedSchedulerClient::EmptyQueue
void EmptyQueue()
Processes all remaining queue members on the calling thread, blocking until queue is empty Must be ca...
Definition: scheduler.cpp:191
SingleThreadedSchedulerClient::AddToProcessQueue
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:180
LOCK
#define LOCK(cs)
Definition: sync.h:226
SingleThreadedSchedulerClient::m_callbacks_mutex
Mutex m_callbacks_mutex
Definition: scheduler.h:122
SyscallSandboxPolicy::SCHEDULER
@ SCHEDULER
CScheduler::shouldStop
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:104
SingleThreadedSchedulerClient::MaybeScheduleProcessQueue
void MaybeScheduleProcessQueue()
Definition: scheduler.cpp:136
CScheduler::scheduleFromNow
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:47
SingleThreadedSchedulerClient
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:117
CScheduler::AreThreadsServicingQueue
bool AreThreadsServicingQueue() const
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:129
CScheduler::schedule
void schedule(Function f, std::chrono::system_clock::time_point t)
Call func at/after time t.
Definition: scheduler.cpp:75
WAIT_LOCK
#define WAIT_LOCK(cs, name)
Definition: sync.h:231
ByteUnit::t
@ t