Bitcoin Core 30.99.0
P2P Digital Currency
threadpool_tests.cpp
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <common/system.h>
6#include <logging.h>
7#include <random.h>
8#include <test/util/common.h>
9#include <util/string.h>
10#include <util/threadpool.h>
11#include <util/time.h>
12
13#include <boost/test/unit_test.hpp>
14#include <latch>
15#include <semaphore>
16
17// General test values
19constexpr char POOL_NAME[] = "test";
20constexpr auto WAIT_TIMEOUT = 120s;
21
25 LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
26 }
27};
28
29// Test Cases Overview
30// 0) Submit task to a non-started pool.
31// 1) Submit tasks and verify completion.
32// 2) Maintain all threads busy except one.
33// 3) Wait for work to finish.
34// 4) Wait for result object.
35// 5) The task throws an exception, catch must be done in the consumer side.
36// 6) Busy workers, help them by processing tasks externally.
37// 7) Recursive submission of tasks.
38// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
39// 9) Congestion test; create more workers than available cores.
40// 10) Ensure Interrupt() prevents further submissions.
41// 11) Start() must not cause a deadlock when called during Stop().
42// 12) Ensure queued tasks complete after Interrupt().
43// 13) Ensure the Stop() calling thread helps drain the queue.
45
46#define WAIT_FOR(futures) \
47 do { \
48 for (const auto& f : futures) { \
49 BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
50 } \
51 } while (0)
52
53// Helper to unwrap a valid pool submission
54template <typename F>
55[[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
56{
57 return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
58}
59
60// Block a number of worker threads by submitting tasks that wait on `release_sem`.
61// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
62std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
63{
64 assert(threadPool.WorkersCount() >= num_of_threads_to_block);
65 std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
66 std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
67 for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
68 ready.count_down();
69 release_sem.acquire();
70 });
71 ready.wait();
72 return blocking_tasks;
73}
74
75// Test 0, submit task to a non-started, interrupted, or stopped pool
76BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
77{
78 ThreadPool threadPool(POOL_NAME);
79 const auto fn_empty = [&] {};
80
81 // Never started: Inactive
82 auto res = threadPool.Submit(fn_empty);
83 BOOST_CHECK(!res);
84 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
85
86 // Interrupted (workers still alive): Interrupted, and Start() must be rejected too
87 std::counting_semaphore<> blocker(0);
88 threadPool.Start(NUM_WORKERS_DEFAULT);
89 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
90 threadPool.Interrupt();
91 res = threadPool.Submit(fn_empty);
92 BOOST_CHECK(!res);
93 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
94 BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
95 blocker.release(NUM_WORKERS_DEFAULT);
96 WAIT_FOR(blocking_tasks);
97
98 // Interrupted then stopped: Inactive
99 threadPool.Stop();
100 res = threadPool.Submit(fn_empty);
101 BOOST_CHECK(!res);
102 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
103
104 // Started then stopped: Inactive
105 threadPool.Start(NUM_WORKERS_DEFAULT);
106 threadPool.Stop();
107 res = threadPool.Submit(fn_empty);
108 BOOST_CHECK(!res);
109 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
110}
111
112// Test 1, submit tasks and verify completion
113BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
114{
115 int num_tasks = 50;
116
117 ThreadPool threadPool(POOL_NAME);
118 threadPool.Start(NUM_WORKERS_DEFAULT);
119 std::atomic<int> counter = 0;
120
121 // Store futures to ensure completion before checking counter.
122 std::vector<std::future<void>> futures;
123 futures.reserve(num_tasks);
124 for (int i = 1; i <= num_tasks; i++) {
125 futures.emplace_back(Submit(threadPool, [&counter, i]() {
126 counter.fetch_add(i, std::memory_order_relaxed);
127 }));
128 }
129
130 // Wait for all tasks to finish
131 WAIT_FOR(futures);
132 int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
133 BOOST_CHECK_EQUAL(counter.load(), expected_value);
134 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
135}
136
137// Test 2, maintain all threads busy except one
138BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
139{
140 ThreadPool threadPool(POOL_NAME);
141 threadPool.Start(NUM_WORKERS_DEFAULT);
142 std::counting_semaphore<> blocker(0);
143 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
144
145 // Now execute tasks on the single available worker
146 // and check that all the tasks are executed.
147 int num_tasks = 15;
148 int counter = 0;
149
150 // Store futures to wait on
151 std::vector<std::future<void>> futures(num_tasks);
152 for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });
153
154 WAIT_FOR(futures);
155 BOOST_CHECK_EQUAL(counter, num_tasks);
156
157 blocker.release(NUM_WORKERS_DEFAULT - 1);
158 WAIT_FOR(blocking_tasks);
159 threadPool.Stop();
160 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
161}
162
163// Test 3, wait for work to finish
164BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
165{
166 ThreadPool threadPool(POOL_NAME);
167 threadPool.Start(NUM_WORKERS_DEFAULT);
168 std::atomic<bool> flag = false;
169 std::future<void> future = Submit(threadPool, [&flag]() {
171 flag.store(true, std::memory_order_release);
172 });
173 BOOST_CHECK(future.wait_for(WAIT_TIMEOUT) == std::future_status::ready);
174 BOOST_CHECK(flag.load(std::memory_order_acquire));
175}
176
177// Test 4, obtain result object
178BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
179{
180 ThreadPool threadPool(POOL_NAME);
181 threadPool.Start(NUM_WORKERS_DEFAULT);
182 std::future<bool> future_bool = Submit(threadPool, []() { return true; });
183 BOOST_CHECK(future_bool.get());
184
185 std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
186 std::string result = future_str.get();
187 BOOST_CHECK_EQUAL(result, "true");
188}
189
190// Test 5, throw exception and catch it on the consumer side
191BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
192{
193 ThreadPool threadPool(POOL_NAME);
194 threadPool.Start(NUM_WORKERS_DEFAULT);
195
196 const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
197
198 const int num_tasks = 5;
199 std::vector<std::future<void>> futures;
200 futures.reserve(num_tasks);
201 for (int i = 0; i < num_tasks; i++) {
202 futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
203 }
204
205 for (int i = 0; i < num_tasks; i++) {
206 BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
207 }
208}
209
210// Test 6, all workers are busy, help them by processing tasks from outside
211BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
212{
213 ThreadPool threadPool(POOL_NAME);
214 threadPool.Start(NUM_WORKERS_DEFAULT);
215
216 std::counting_semaphore<> blocker(0);
217 const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
218
219 // Now submit tasks and check that none of them are executed.
220 int num_tasks = 20;
221 std::atomic<int> counter = 0;
222 for (int i = 0; i < num_tasks; i++) {
223 (void)Submit(threadPool, [&counter]() {
224 counter.fetch_add(1, std::memory_order_relaxed);
225 });
226 }
228 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
229
230 // Now process manually
231 for (int i = 0; i < num_tasks; i++) {
232 threadPool.ProcessTask();
233 }
234 BOOST_CHECK_EQUAL(counter.load(), num_tasks);
235 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
236 blocker.release(NUM_WORKERS_DEFAULT);
237 threadPool.Stop();
238 WAIT_FOR(blocking_tasks);
239}
240
241// Test 7, submit tasks from other tasks
242BOOST_AUTO_TEST_CASE(recursive_task_submission)
243{
244 ThreadPool threadPool(POOL_NAME);
245 threadPool.Start(NUM_WORKERS_DEFAULT);
246
247 std::promise<void> signal;
248 (void)Submit(threadPool, [&]() {
249 (void)Submit(threadPool, [&]() {
250 signal.set_value();
251 });
252 });
253
254 signal.get_future().wait();
255 threadPool.Stop();
256}
257
258// Test 8, submit task when all threads are busy and then stop the pool
259BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
260{
261 ThreadPool threadPool(POOL_NAME);
262 threadPool.Start(NUM_WORKERS_DEFAULT);
263
264 std::counting_semaphore<> blocker(0);
265 const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
266
267 // Submit an extra task that should execute once a worker is free
268 std::future<bool> future = Submit(threadPool, []() { return true; });
269
270 // At this point, all workers are blocked, and the extra task is queued
271 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
272
273 // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
274 std::thread thread_unblocker([&blocker]() {
276 blocker.release(NUM_WORKERS_DEFAULT);
277 });
278
279 // Stop the pool while the workers are still blocked
280 threadPool.Stop();
281
282 // Expect the submitted task to complete
283 BOOST_CHECK(future.get());
284 thread_unblocker.join();
285
286 // Obviously all the previously blocking tasks should be completed at this point too
287 WAIT_FOR(blocking_tasks);
288
289 // Pool should be stopped and no workers remaining
290 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
291}
292
293// Test 9, more workers than available cores (congestion test)
294BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
295{
296 ThreadPool threadPool(POOL_NAME);
297 threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
298
299 int num_tasks = 200;
300 std::atomic<int> counter{0};
301
302 std::vector<std::future<void>> futures;
303 futures.reserve(num_tasks);
304 for (int i = 0; i < num_tasks; i++) {
305 futures.emplace_back(Submit(threadPool, [&counter] {
306 counter.fetch_add(1, std::memory_order_relaxed);
307 }));
308 }
309
310 WAIT_FOR(futures);
311 BOOST_CHECK_EQUAL(counter.load(), num_tasks);
312}
313
314// Test 10, Interrupt() prevents further submissions
315BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
316{
317 // 1) Interrupt from main thread
318 ThreadPool threadPool(POOL_NAME);
319 threadPool.Start(NUM_WORKERS_DEFAULT);
320 threadPool.Interrupt();
321
322 auto res = threadPool.Submit([]{});
323 BOOST_CHECK(!res);
324 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
325
326 // Reset pool
327 threadPool.Stop();
328
329 // 2) Interrupt() from a worker thread
330 // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
331 threadPool.Start(/*num_workers=*/3);
332 std::atomic<int> counter{0};
333 std::counting_semaphore<> blocker(0);
334 const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
335 Submit(threadPool, [&threadPool, &counter]{
336 threadPool.Interrupt();
337 counter.fetch_add(1, std::memory_order_relaxed);
338 }).get();
339 blocker.release(1); // unblock worker
340
341 BOOST_CHECK_EQUAL(counter.load(), 1);
342 threadPool.Stop();
343 WAIT_FOR(blocking_tasks);
344 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
345}
346
347// Test 11, Start() must not cause a deadlock when called during Stop()
348BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
349{
350 ThreadPool threadPool(POOL_NAME);
351 threadPool.Start(NUM_WORKERS_DEFAULT);
352
353 // Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
354 std::counting_semaphore<> workers_blocker(0);
355 const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
356
357 std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
358
359 // Stop() takes ownership of the workers before joining them, so WorkersCount()
360 // hits 0 the moment Stop() is waiting for them to join. That is our signal
361 // to call Start() right into the middle of the joining phase.
362 while (threadPool.WorkersCount() != 0) {
363 std::this_thread::yield(); // let the OS breathe so it can switch context
364 }
365 // Now we know for sure the stopper thread is hanging while workers are still alive.
366 // Restart the pool and resume workers so the stopper thread can proceed.
367 // This will throw an exception only if the pool handles Start-Stop race properly,
368 // otherwise it will proceed and hang the stopper_thread.
369 try {
370 threadPool.Start(NUM_WORKERS_DEFAULT);
371 } catch (std::exception& e) {
372 BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
373 }
374 workers_blocker.release(NUM_WORKERS_DEFAULT);
375 WAIT_FOR(blocking_tasks);
376
377 // If Stop() is stuck, joining the stopper thread will deadlock
378 stopper_thread.join();
379}
380
381// Test 12, queued tasks complete after Interrupt()
382BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
383{
384 ThreadPool threadPool(POOL_NAME);
385 threadPool.Start(NUM_WORKERS_DEFAULT);
386
387 std::counting_semaphore<> blocker(0);
388 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
389
390 // Queue tasks while all workers are busy, then interrupt
391 std::atomic<int> counter{0};
392 const int num_tasks = 10;
393 std::vector<std::future<void>> futures;
394 futures.reserve(num_tasks);
395 for (int i = 0; i < num_tasks; i++) {
396 futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
397 }
398 threadPool.Interrupt();
399
400 // Queued tasks must still complete despite the interrupt
401 blocker.release(NUM_WORKERS_DEFAULT);
402 WAIT_FOR(futures);
403 BOOST_CHECK_EQUAL(counter.load(), num_tasks);
404
405 threadPool.Stop();
406 WAIT_FOR(blocking_tasks);
407}
408
409// Test 13, ensure the Stop() calling thread helps drain the queue
410BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
411{
412 ThreadPool threadPool(POOL_NAME);
413 threadPool.Start(NUM_WORKERS_DEFAULT);
414
415 std::counting_semaphore<> blocker(0);
416 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
417
418 auto main_thread_id = std::this_thread::get_id();
419 std::atomic<int> main_thread_tasks{0};
420 const size_t num_tasks = 20;
421 for (size_t i = 0; i < num_tasks; i++) {
422 (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
423 if (std::this_thread::get_id() == main_thread_id)
424 main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
425 });
426 }
427 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
428
429 // Delay release so Stop() drains all tasks from the calling thread
430 std::thread unblocker([&blocker, &threadPool]() {
431 while (threadPool.WorkQueueSize() > 0) {
432 std::this_thread::yield();
433 }
434 blocker.release(NUM_WORKERS_DEFAULT);
435 });
436
437 threadPool.Stop();
438 unblocker.join();
439
440 // Check the main thread processed all tasks
441 BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
442 WAIT_FOR(blocking_tasks);
443}
444
#define Assert(val)
Identity function.
Definition: check.h:113
Fast randomness source.
Definition: random.h:386
BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
Definition: common.h:18
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:47
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:104
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:229
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:193
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:127
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:218
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:224
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:174
int GetNumCores()
Return the number of cores available on the current system.
Definition: system.cpp:109
BOOST_FIXTURE_TEST_SUITE(cuckoocache_tests, BasicTestingSetup)
Test Suite for CuckooCache.
BOOST_AUTO_TEST_SUITE_END()
#define LogInfo(...)
Definition: log.h:95
#define BOOST_CHECK_EQUAL(v1, v2)
Definition: object.cpp:17
#define BOOST_CHECK(expr)
Definition: object.cpp:16
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
Definition: threadpool.h:235
#define WAIT_FOR(futures)
BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
constexpr auto WAIT_TIMEOUT
constexpr char POOL_NAME[]
auto Submit(ThreadPool &pool, F &&fn)
std::vector< std::future< void > > BlockWorkers(ThreadPool &threadPool, std::counting_semaphore<> &release_sem, size_t num_of_threads_to_block)
int NUM_WORKERS_DEFAULT
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
void UninterruptibleSleep(const std::chrono::microseconds &n)
Definition: time.cpp:24
assert(!tx.IsCoinBase())