Bitcoin Core 28.99.0
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2022 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <scheduler.h>
6
7#include <sync.h>
8#include <util/time.h>
9
10#include <cassert>
11#include <functional>
12#include <utility>
13
14CScheduler::CScheduler() = default;
15
17{
18 assert(nThreadsServicingQueue == 0);
19 if (stopWhenEmpty) assert(taskQueue.empty());
20}
21
22
24{
26 ++nThreadsServicingQueue;
27
28 // newTaskMutex is locked throughout this loop EXCEPT
29 // when the thread is waiting or when the user's function
30 // is called.
31 while (!shouldStop()) {
32 try {
33 while (!shouldStop() && taskQueue.empty()) {
34 // Wait until there is something to do.
35 newTaskScheduled.wait(lock);
36 }
37
38 // Wait until either there is a new task, or until
39 // the time of the first item on the queue:
40
41 while (!shouldStop() && !taskQueue.empty()) {
42 std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first;
43 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
44 break; // Exit loop after timeout, it means we reached the time of the event
45 }
46 }
47
48 // If there are multiple threads, the queue can empty while we're waiting (another
49 // thread may service the task we were waiting on).
50 if (shouldStop() || taskQueue.empty())
51 continue;
52
53 Function f = taskQueue.begin()->second;
54 taskQueue.erase(taskQueue.begin());
55
56 {
57 // Unlock before calling f, so it can reschedule itself or another task
58 // without deadlocking:
59 REVERSE_LOCK(lock);
60 f();
61 }
62 } catch (...) {
63 --nThreadsServicingQueue;
64 throw;
65 }
66 }
67 --nThreadsServicingQueue;
68 newTaskScheduled.notify_one();
69}
70
71void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t)
72{
73 {
75 taskQueue.insert(std::make_pair(t, f));
76 }
77 newTaskScheduled.notify_one();
78}
79
80void CScheduler::MockForward(std::chrono::seconds delta_seconds)
81{
82 assert(delta_seconds > 0s && delta_seconds <= 1h);
83
84 {
86
87 // use temp_queue to maintain updated schedule
88 std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue;
89
90 for (const auto& element : taskQueue) {
91 temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
92 }
93
94 // point taskQueue to temp_queue
95 taskQueue = std::move(temp_queue);
96 }
97
98 // notify that the taskQueue needs to be processed
99 newTaskScheduled.notify_one();
100}
101
102static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
103{
104 f();
105 s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
106}
107
108void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
109{
110 scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta);
111}
112
113size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first,
114 std::chrono::steady_clock::time_point& last) const
115{
117 size_t result = taskQueue.size();
118 if (!taskQueue.empty()) {
119 first = taskQueue.begin()->first;
120 last = taskQueue.rbegin()->first;
121 }
122 return result;
123}
124
126{
128 return nThreadsServicingQueue;
129}
130
131
133{
134 {
136 // Try to avoid scheduling too many copies here, but if we
137 // accidentally have two ProcessQueue's scheduled at once its
138 // not a big deal.
139 if (m_are_callbacks_running) return;
140 if (m_callbacks_pending.empty()) return;
141 }
142 m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now());
143}
144
146{
147 std::function<void()> callback;
148 {
150 if (m_are_callbacks_running) return;
151 if (m_callbacks_pending.empty()) return;
152 m_are_callbacks_running = true;
153
154 callback = std::move(m_callbacks_pending.front());
155 m_callbacks_pending.pop_front();
156 }
157
158 // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
159 // to ensure both happen safely even if callback() throws.
160 struct RAIICallbacksRunning {
161 SerialTaskRunner* instance;
162 explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {}
163 ~RAIICallbacksRunning()
164 {
165 {
166 LOCK(instance->m_callbacks_mutex);
167 instance->m_are_callbacks_running = false;
168 }
169 instance->MaybeScheduleProcessQueue();
170 }
171 } raiicallbacksrunning(this);
172
173 callback();
174}
175
176void SerialTaskRunner::insert(std::function<void()> func)
177{
178 {
180 m_callbacks_pending.emplace_back(std::move(func));
181 }
183}
184
186{
188 bool should_continue = true;
189 while (should_continue) {
190 ProcessQueue();
192 should_continue = !m_callbacks_pending.empty();
193 }
194}
195
197{
199 return m_callbacks_pending.size();
200}
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:40
void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:80
void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Services the queue 'forever'.
Definition: scheduler.cpp:23
void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Repeat f until the scheduler is stopped.
Definition: scheduler.cpp:108
size_t getQueueInfo(std::chrono::steady_clock::time_point &first, std::chrono::steady_clock::time_point &last) const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:113
std::function< void()> Function
Definition: scheduler.h:47
bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:125
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:111
std::condition_variable newTaskScheduled
Definition: scheduler.h:106
Mutex newTaskMutex
Definition: scheduler.h:105
void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call f once after the delta has passed.
Definition: scheduler.h:53
void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call func at/after time t.
Definition: scheduler.cpp:71
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:125
void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:132
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:145
size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Returns the number of currently pending events.
Definition: scheduler.cpp:196
void insert(std::function< void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Add a callback to be executed.
Definition: scheduler.cpp:176
Mutex m_callbacks_mutex
Definition: scheduler.h:129
CScheduler & m_scheduler
Definition: scheduler.h:127
void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Processes all remaining queue members on the calling thread, blocking until queue is empty Must be ca...
Definition: scheduler.cpp:185
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
Definition: scheduler.cpp:102
#define WAIT_LOCK(cs, name)
Definition: sync.h:262
#define LOCK(cs)
Definition: sync.h:257
#define REVERSE_LOCK(g)
Definition: sync.h:243
assert(!tx.IsCoinBase())