Bitcoin Core 31.99.0
P2P Digital Currency
threadpool_tests.cpp
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <common/system.h>
6#include <logging.h>
7#include <random.h>
8#include <test/util/common.h>
9#include <util/string.h>
10#include <util/threadpool.h>
11#include <util/time.h>
12
13#include <boost/test/unit_test.hpp>
14
15#include <array>
16#include <functional>
17#include <latch>
18#include <ranges>
19#include <semaphore>
20
21// General test values
23constexpr char POOL_NAME[] = "test";
24constexpr auto TEST_WAIT_TIMEOUT = 120s;
25
29 LogInfo("thread pool workers count: %d", NUM_WORKERS_DEFAULT);
30 }
31};
32
33// Test Cases Overview
34// 0) Submit task to a non-started pool.
35// 1) Submit tasks and verify completion.
36// 2) Maintain all threads busy except one.
37// 3) Wait for work to finish.
38// 4) Wait for result object.
39// 5) The task throws an exception, catch must be done in the consumer side.
40// 6) Busy workers, help them by processing tasks externally.
41// 7) Recursive submission of tasks.
42// 8) Submit task when all threads are busy, stop pool and verify task gets executed.
43// 9) Congestion test; create more workers than available cores.
44// 10) Ensure Interrupt() prevents further submissions.
45// 11) Start() must not cause a deadlock when called during Stop().
46// 12) Ensure queued tasks complete after Interrupt().
47// 13) Ensure the Stop() calling thread helps drain the queue.
48// 14) Submit range of tasks in one lock acquisition.
50
51#define WAIT_FOR(futures) \
52 do { \
53 for (const auto& f : futures) { \
54 BOOST_REQUIRE(f.wait_for(TEST_WAIT_TIMEOUT) == std::future_status::ready); \
55 } \
56 } while (0)
57
58// Helper to unwrap a valid pool submission
59template <typename F>
60[[nodiscard]] auto Submit(ThreadPool& pool, F&& fn)
61{
62 return std::move(*Assert(pool.Submit(std::forward<F>(fn))));
63}
64
65// Block a number of worker threads by submitting tasks that wait on `release_sem`.
66// Returns the futures of the blocking tasks, ensuring all have started and are waiting.
67std::vector<std::future<void>> BlockWorkers(ThreadPool& threadPool, std::counting_semaphore<>& release_sem, size_t num_of_threads_to_block)
68{
69 assert(threadPool.WorkersCount() >= num_of_threads_to_block);
70 std::latch ready{static_cast<std::ptrdiff_t>(num_of_threads_to_block)};
71 std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
72 for (auto& f : blocking_tasks) f = Submit(threadPool, [&] {
73 ready.count_down();
74 release_sem.acquire();
75 });
76 ready.wait();
77 return blocking_tasks;
78}
79
80// Test 0, submit task to a non-started, interrupted, or stopped pool
81BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
82{
83 ThreadPool threadPool(POOL_NAME);
84 const auto fn_empty = [&] {};
85
86 // Never started: Inactive
87 auto res = threadPool.Submit(fn_empty);
88 BOOST_CHECK(!res);
89 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
90
91 // Interrupted (workers still alive): Interrupted, and Start() must be rejected too
92 std::counting_semaphore<> blocker(0);
93 threadPool.Start(NUM_WORKERS_DEFAULT);
94 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
95 threadPool.Interrupt();
96 res = threadPool.Submit(fn_empty);
97 BOOST_CHECK(!res);
98 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
99 BOOST_CHECK_EXCEPTION(threadPool.Start(NUM_WORKERS_DEFAULT), std::runtime_error, HasReason("Thread pool has been interrupted or is stopping"));
100 blocker.release(NUM_WORKERS_DEFAULT);
101 WAIT_FOR(blocking_tasks);
102
103 // Interrupted then stopped: Inactive
104 threadPool.Stop();
105 res = threadPool.Submit(fn_empty);
106 BOOST_CHECK(!res);
107 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
108
109 // Started then stopped: Inactive
110 threadPool.Start(NUM_WORKERS_DEFAULT);
111 threadPool.Stop();
112 res = threadPool.Submit(fn_empty);
113 BOOST_CHECK(!res);
114 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "No active workers");
115
116 std::vector<std::function<void()>> tasks;
117 const auto range_res{threadPool.Submit(std::move(tasks))};
118 BOOST_CHECK(!range_res);
119 BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "No active workers");
120}
121
122// Test 1, submit tasks and verify completion
123BOOST_AUTO_TEST_CASE(submit_tasks_complete_successfully)
124{
125 int num_tasks = 50;
126
127 ThreadPool threadPool(POOL_NAME);
128 threadPool.Start(NUM_WORKERS_DEFAULT);
129 std::atomic<int> counter = 0;
130
131 // Store futures to ensure completion before checking counter.
132 std::vector<std::future<void>> futures;
133 futures.reserve(num_tasks);
134 for (int i = 1; i <= num_tasks; i++) {
135 futures.emplace_back(Submit(threadPool, [&counter, i]() {
136 counter.fetch_add(i, std::memory_order_relaxed);
137 }));
138 }
139
140 // Wait for all tasks to finish
141 WAIT_FOR(futures);
142 int expected_value = (num_tasks * (num_tasks + 1)) / 2; // Gauss sum.
143 BOOST_CHECK_EQUAL(counter.load(), expected_value);
144 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
145}
146
147// Test 2, maintain all threads busy except one
148BOOST_AUTO_TEST_CASE(single_available_worker_executes_all_tasks)
149{
150 ThreadPool threadPool(POOL_NAME);
151 threadPool.Start(NUM_WORKERS_DEFAULT);
152 std::counting_semaphore<> blocker(0);
153 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT - 1);
154
155 // Now execute tasks on the single available worker
156 // and check that all the tasks are executed.
157 int num_tasks = 15;
158 int counter = 0;
159
160 // Store futures to wait on
161 std::vector<std::future<void>> futures(num_tasks);
162 for (auto& f : futures) f = Submit(threadPool, [&counter]{ counter++; });
163
164 WAIT_FOR(futures);
165 BOOST_CHECK_EQUAL(counter, num_tasks);
166
167 blocker.release(NUM_WORKERS_DEFAULT - 1);
168 WAIT_FOR(blocking_tasks);
169 threadPool.Stop();
170 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
171}
172
173// Test 3, wait for work to finish
174BOOST_AUTO_TEST_CASE(wait_for_task_to_finish)
175{
176 ThreadPool threadPool(POOL_NAME);
177 threadPool.Start(NUM_WORKERS_DEFAULT);
178 std::atomic<bool> flag = false;
179 std::future<void> future = Submit(threadPool, [&flag]() {
181 flag.store(true, std::memory_order_release);
182 });
183 BOOST_CHECK(future.wait_for(TEST_WAIT_TIMEOUT) == std::future_status::ready);
184 BOOST_CHECK(flag.load(std::memory_order_acquire));
185}
186
187// Test 4, obtain result object
188BOOST_AUTO_TEST_CASE(get_result_from_completed_task)
189{
190 ThreadPool threadPool(POOL_NAME);
191 threadPool.Start(NUM_WORKERS_DEFAULT);
192 std::future<bool> future_bool = Submit(threadPool, []() { return true; });
193 BOOST_CHECK(future_bool.get());
194
195 std::future<std::string> future_str = Submit(threadPool, []() { return std::string("true"); });
196 std::string result = future_str.get();
197 BOOST_CHECK_EQUAL(result, "true");
198}
199
200// Test 5, throw exception and catch it on the consumer side
201BOOST_AUTO_TEST_CASE(task_exception_propagates_to_future)
202{
203 ThreadPool threadPool(POOL_NAME);
204 threadPool.Start(NUM_WORKERS_DEFAULT);
205
206 const auto make_err{[&](size_t n) { return strprintf("error on thread #%s", n); }};
207
208 const int num_tasks = 5;
209 std::vector<std::future<void>> futures;
210 futures.reserve(num_tasks);
211 for (int i = 0; i < num_tasks; i++) {
212 futures.emplace_back(Submit(threadPool, [&make_err, i] { throw std::runtime_error(make_err(i)); }));
213 }
214
215 for (int i = 0; i < num_tasks; i++) {
216 BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error, HasReason{make_err(i)});
217 }
218}
219
220// Test 6, all workers are busy, help them by processing tasks from outside
221BOOST_AUTO_TEST_CASE(process_tasks_manually_when_workers_busy)
222{
223 ThreadPool threadPool(POOL_NAME);
224 threadPool.Start(NUM_WORKERS_DEFAULT);
225
226 std::counting_semaphore<> blocker(0);
227 const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
228
229 // Now submit tasks and check that none of them are executed.
230 int num_tasks = 20;
231 std::atomic<int> counter = 0;
232 for (int i = 0; i < num_tasks; i++) {
233 (void)Submit(threadPool, [&counter]() {
234 counter.fetch_add(1, std::memory_order_relaxed);
235 });
236 }
238 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
239
240 // Now process manually
241 for (int i = 0; i < num_tasks; i++) {
242 threadPool.ProcessTask();
243 }
244 BOOST_CHECK_EQUAL(counter.load(), num_tasks);
245 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
246 blocker.release(NUM_WORKERS_DEFAULT);
247 threadPool.Stop();
248 WAIT_FOR(blocking_tasks);
249}
250
251// Test 7, submit tasks from other tasks
252BOOST_AUTO_TEST_CASE(recursive_task_submission)
253{
254 ThreadPool threadPool(POOL_NAME);
255 threadPool.Start(NUM_WORKERS_DEFAULT);
256
257 std::promise<void> signal;
258 (void)Submit(threadPool, [&]() {
259 (void)Submit(threadPool, [&]() {
260 signal.set_value();
261 });
262 });
263
264 signal.get_future().wait();
265 threadPool.Stop();
266}
267
268// Test 8, submit task when all threads are busy and then stop the pool
269BOOST_AUTO_TEST_CASE(task_submitted_while_busy_completes)
270{
271 ThreadPool threadPool(POOL_NAME);
272 threadPool.Start(NUM_WORKERS_DEFAULT);
273
274 std::counting_semaphore<> blocker(0);
275 const auto& blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
276
277 // Submit an extra task that should execute once a worker is free
278 std::future<bool> future = Submit(threadPool, []() { return true; });
279
280 // At this point, all workers are blocked, and the extra task is queued
281 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 1);
282
283 // Wait a short moment before unblocking the threads to mimic a concurrent shutdown
284 std::thread thread_unblocker([&blocker]() {
286 blocker.release(NUM_WORKERS_DEFAULT);
287 });
288
289 // Stop the pool while the workers are still blocked
290 threadPool.Stop();
291
292 // Expect the submitted task to complete
293 BOOST_CHECK(future.get());
294 thread_unblocker.join();
295
296 // Obviously all the previously blocking tasks should be completed at this point too
297 WAIT_FOR(blocking_tasks);
298
299 // Pool should be stopped and no workers remaining
300 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
301}
302
303// Test 9, more workers than available cores (congestion test)
304BOOST_AUTO_TEST_CASE(congestion_more_workers_than_cores)
305{
306 ThreadPool threadPool(POOL_NAME);
307 threadPool.Start(std::max(1, GetNumCores() * 2)); // Oversubscribe by 2×
308
309 int num_tasks = 200;
310 std::atomic<int> counter{0};
311
312 std::vector<std::future<void>> futures;
313 futures.reserve(num_tasks);
314 for (int i = 0; i < num_tasks; i++) {
315 futures.emplace_back(Submit(threadPool, [&counter] {
316 counter.fetch_add(1, std::memory_order_relaxed);
317 }));
318 }
319
320 WAIT_FOR(futures);
321 BOOST_CHECK_EQUAL(counter.load(), num_tasks);
322}
323
324// Test 10, Interrupt() prevents further submissions
325BOOST_AUTO_TEST_CASE(interrupt_blocks_new_submissions)
326{
327 // 1) Interrupt from main thread
328 ThreadPool threadPool(POOL_NAME);
329 threadPool.Start(NUM_WORKERS_DEFAULT);
330 threadPool.Interrupt();
331
332 auto res = threadPool.Submit([]{});
333 BOOST_CHECK(!res);
334 BOOST_CHECK_EQUAL(SubmitErrorString(res.error()), "Interrupted");
335
336 std::vector<std::function<void()>> tasks;
337 const auto range_res{threadPool.Submit(std::move(tasks))};
338 BOOST_CHECK(!range_res);
339 BOOST_CHECK_EQUAL(SubmitErrorString(range_res.error()), "Interrupted");
340
341 // Reset pool
342 threadPool.Stop();
343
344 // 2) Interrupt() from a worker thread
345 // One worker is blocked, another calls Interrupt(), and the remaining one waits for tasks.
346 threadPool.Start(/*num_workers=*/3);
347 std::atomic<int> counter{0};
348 std::counting_semaphore<> blocker(0);
349 const auto blocking_tasks = BlockWorkers(threadPool, blocker, 1);
350 Submit(threadPool, [&threadPool, &counter]{
351 threadPool.Interrupt();
352 counter.fetch_add(1, std::memory_order_relaxed);
353 }).get();
354 blocker.release(1); // unblock worker
355
356 BOOST_CHECK_EQUAL(counter.load(), 1);
357 threadPool.Stop();
358 WAIT_FOR(blocking_tasks);
359 BOOST_CHECK_EQUAL(threadPool.WorkersCount(), 0);
360}
361
362// Test 11, Start() must not cause a deadlock when called during Stop()
363BOOST_AUTO_TEST_CASE(start_mid_stop_does_not_deadlock)
364{
365 ThreadPool threadPool(POOL_NAME);
366 threadPool.Start(NUM_WORKERS_DEFAULT);
367
368 // Keep all workers busy so Stop() gets stuck waiting for them to finish during join()
369 std::counting_semaphore<> workers_blocker(0);
370 const auto blocking_tasks = BlockWorkers(threadPool, workers_blocker, NUM_WORKERS_DEFAULT);
371
372 std::thread stopper_thread([&threadPool] { threadPool.Stop(); });
373
374 // Stop() takes ownership of the workers before joining them, so WorkersCount()
375 // hits 0 the moment Stop() is waiting for them to join. That is our signal
376 // to call Start() right into the middle of the joining phase.
377 while (threadPool.WorkersCount() != 0) {
378 std::this_thread::yield(); // let the OS breathe so it can switch context
379 }
380 // Now we know for sure the stopper thread is hanging while workers are still alive.
381 // Restart the pool and resume workers so the stopper thread can proceed.
382 // This will throw an exception only if the pool handles Start-Stop race properly,
383 // otherwise it will proceed and hang the stopper_thread.
384 try {
385 threadPool.Start(NUM_WORKERS_DEFAULT);
386 } catch (std::exception& e) {
387 BOOST_CHECK_EQUAL(e.what(), "Thread pool has been interrupted or is stopping");
388 }
389 workers_blocker.release(NUM_WORKERS_DEFAULT);
390 WAIT_FOR(blocking_tasks);
391
392 // If Stop() is stuck, joining the stopper thread will deadlock
393 stopper_thread.join();
394}
395
396// Test 12, queued tasks complete after Interrupt()
397BOOST_AUTO_TEST_CASE(queued_tasks_complete_after_interrupt)
398{
399 ThreadPool threadPool(POOL_NAME);
400 threadPool.Start(NUM_WORKERS_DEFAULT);
401
402 std::counting_semaphore<> blocker(0);
403 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
404
405 // Queue tasks while all workers are busy, then interrupt
406 std::atomic<int> counter{0};
407 const int num_tasks = 10;
408 std::vector<std::future<void>> futures;
409 futures.reserve(num_tasks);
410 for (int i = 0; i < num_tasks; i++) {
411 futures.emplace_back(Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
412 }
413 threadPool.Interrupt();
414
415 // Queued tasks must still complete despite the interrupt
416 blocker.release(NUM_WORKERS_DEFAULT);
417 WAIT_FOR(futures);
418 BOOST_CHECK_EQUAL(counter.load(), num_tasks);
419
420 threadPool.Stop();
421 WAIT_FOR(blocking_tasks);
422}
423
424// Test 13, ensure the Stop() calling thread helps drain the queue
425BOOST_AUTO_TEST_CASE(stop_active_wait_drains_queue)
426{
427 ThreadPool threadPool(POOL_NAME);
428 threadPool.Start(NUM_WORKERS_DEFAULT);
429
430 std::counting_semaphore<> blocker(0);
431 const auto blocking_tasks = BlockWorkers(threadPool, blocker, NUM_WORKERS_DEFAULT);
432
433 auto main_thread_id = std::this_thread::get_id();
434 std::atomic<int> main_thread_tasks{0};
435 const size_t num_tasks = 20;
436 for (size_t i = 0; i < num_tasks; i++) {
437 (void)Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
438 if (std::this_thread::get_id() == main_thread_id)
439 main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
440 });
441 }
442 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), num_tasks);
443
444 // Delay release so Stop() drains all tasks from the calling thread
445 std::thread unblocker([&blocker, &threadPool]() {
446 while (threadPool.WorkQueueSize() > 0) {
447 std::this_thread::yield();
448 }
449 blocker.release(NUM_WORKERS_DEFAULT);
450 });
451
452 threadPool.Stop();
453 unblocker.join();
454
455 // Check the main thread processed all tasks
456 BOOST_CHECK_EQUAL(main_thread_tasks.load(), num_tasks);
457 WAIT_FOR(blocking_tasks);
458}
459
460// Test 14, submit range of tasks in one lock acquisition
461BOOST_AUTO_TEST_CASE(submit_range_of_tasks_complete_successfully)
462{
463 constexpr int32_t num_tasks{50};
464
465 ThreadPool threadPool{POOL_NAME};
466 threadPool.Start(NUM_WORKERS_DEFAULT);
467 std::atomic_int32_t sum{0};
468 const auto square{[&sum](int32_t i) {
469 sum.fetch_add(i, std::memory_order_relaxed);
470 return i * i;
471 }};
472
473 std::array<std::function<int32_t()>, static_cast<size_t>(num_tasks)> array_tasks;
474 std::vector<std::function<int32_t()>> vector_tasks;
475 vector_tasks.reserve(static_cast<size_t>(num_tasks));
476 for (const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
477 array_tasks.at(static_cast<size_t>(i - 1)) = [i, square] { return square(i); };
478 vector_tasks.emplace_back([i, square] { return square(i); });
479 }
480
481 auto futures{std::move(*Assert(threadPool.Submit(std::move(array_tasks))))};
482 BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks));
483 std::ranges::move(*Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
484 BOOST_CHECK_EQUAL(futures.size(), static_cast<size_t>(num_tasks * 2));
485
486 auto squares_sum{0};
487 for (auto& future : futures) {
488 squares_sum += future.get();
489 }
490
491 // 2x Gauss sum.
492 const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
493 const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
494 BOOST_CHECK_EQUAL(sum, expected_sum);
495 BOOST_CHECK_EQUAL(squares_sum, expected_squares_sum);
496 BOOST_CHECK_EQUAL(threadPool.WorkQueueSize(), 0);
497}
498
#define Assert(val)
Identity function.
Definition: check.h:113
Fast randomness source.
Definition: random.h:386
BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
Definition: common.h:18
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:48
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
Definition: threadpool.h:105
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:279
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
Definition: threadpool.h:243
util::Expected< Future< F >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
Definition: threadpool.h:184
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
Definition: threadpool.h:128
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
Definition: threadpool.h:268
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Definition: threadpool.h:274
int GetNumCores()
Return the number of cores available on the current system.
Definition: system.cpp:109
BOOST_FIXTURE_TEST_SUITE(cuckoocache_tests, BasicTestingSetup)
Test Suite for CuckooCache.
BOOST_AUTO_TEST_SUITE_END()
volatile double sum
Definition: examples.cpp:10
#define LogInfo(...)
Definition: log.h:97
#define BOOST_CHECK_EQUAL(v1, v2)
Definition: object.cpp:17
#define BOOST_CHECK(expr)
Definition: object.cpp:16
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
Definition: threadpool.h:285
#define WAIT_FOR(futures)
BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
constexpr char POOL_NAME[]
constexpr auto TEST_WAIT_TIMEOUT
auto Submit(ThreadPool &pool, F &&fn)
std::vector< std::future< void > > BlockWorkers(ThreadPool &threadPool, std::counting_semaphore<> &release_sem, size_t num_of_threads_to_block)
int NUM_WORKERS_DEFAULT
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1172
void UninterruptibleSleep(const std::chrono::microseconds &n)
Definition: time.cpp:24
assert(!tx.IsCoinBase())