13#include <boost/test/unit_test.hpp>
51#define WAIT_FOR(futures) \
53 for (const auto& f : futures) { \
54 BOOST_REQUIRE(f.wait_for(TEST_WAIT_TIMEOUT) == std::future_status::ready); \
62 return std::move(*
Assert(pool.
Submit(std::forward<F>(fn))));
67std::vector<std::future<void>>
BlockWorkers(
ThreadPool& threadPool, std::counting_semaphore<>& release_sem,
size_t num_of_threads_to_block)
70 std::latch ready{
static_cast<std::ptrdiff_t
>(num_of_threads_to_block)};
71 std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
72 for (
auto& f : blocking_tasks) f =
Submit(threadPool, [&] {
74 release_sem.acquire();
77 return blocking_tasks;
84 const auto fn_empty = [&] {};
87 auto res = threadPool.
Submit(fn_empty);
92 std::counting_semaphore<> blocker(0);
96 res = threadPool.
Submit(fn_empty);
105 res = threadPool.
Submit(fn_empty);
112 res = threadPool.
Submit(fn_empty);
116 std::vector<std::function<void()>> tasks;
117 const auto range_res{threadPool.
Submit(std::move(tasks))};
129 std::atomic<int> counter = 0;
132 std::vector<std::future<void>> futures;
133 futures.reserve(num_tasks);
134 for (
int i = 1; i <= num_tasks; i++) {
135 futures.emplace_back(
Submit(threadPool, [&counter, i]() {
136 counter.fetch_add(i, std::memory_order_relaxed);
142 int expected_value = (num_tasks * (num_tasks + 1)) / 2;
152 std::counting_semaphore<> blocker(0);
161 std::vector<std::future<void>> futures(num_tasks);
162 for (
auto& f : futures) f =
Submit(threadPool, [&counter]{ counter++; });
178 std::atomic<bool> flag =
false;
179 std::future<void> future =
Submit(threadPool, [&flag]() {
181 flag.store(
true, std::memory_order_release);
192 std::future<bool> future_bool =
Submit(threadPool, []() {
return true; });
195 std::future<std::string> future_str =
Submit(threadPool, []() {
return std::string(
"true"); });
196 std::string result = future_str.get();
206 const auto make_err{[&](
size_t n) {
return strprintf(
"error on thread #%s", n); }};
208 const int num_tasks = 5;
209 std::vector<std::future<void>> futures;
210 futures.reserve(num_tasks);
211 for (
int i = 0; i < num_tasks; i++) {
212 futures.emplace_back(
Submit(threadPool, [&make_err, i] {
throw std::runtime_error(make_err(i)); }));
215 for (
int i = 0; i < num_tasks; i++) {
216 BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error,
HasReason{make_err(i)});
226 std::counting_semaphore<> blocker(0);
231 std::atomic<int> counter = 0;
232 for (
int i = 0; i < num_tasks; i++) {
233 (void)
Submit(threadPool, [&counter]() {
234 counter.fetch_add(1, std::memory_order_relaxed);
241 for (
int i = 0; i < num_tasks; i++) {
257 std::promise<void> signal;
258 (void)
Submit(threadPool, [&]() {
259 (void)
Submit(threadPool, [&]() {
264 signal.get_future().wait();
274 std::counting_semaphore<> blocker(0);
278 std::future<bool> future =
Submit(threadPool, []() {
return true; });
284 std::thread thread_unblocker([&blocker]() {
294 thread_unblocker.join();
310 std::atomic<int> counter{0};
312 std::vector<std::future<void>> futures;
313 futures.reserve(num_tasks);
314 for (
int i = 0; i < num_tasks; i++) {
315 futures.emplace_back(
Submit(threadPool, [&counter] {
316 counter.fetch_add(1, std::memory_order_relaxed);
332 auto res = threadPool.
Submit([]{});
336 std::vector<std::function<void()>> tasks;
337 const auto range_res{threadPool.
Submit(std::move(tasks))};
347 std::atomic<int> counter{0};
348 std::counting_semaphore<> blocker(0);
349 const auto blocking_tasks =
BlockWorkers(threadPool, blocker, 1);
350 Submit(threadPool, [&threadPool, &counter]{
352 counter.fetch_add(1, std::memory_order_relaxed);
369 std::counting_semaphore<> workers_blocker(0);
372 std::thread stopper_thread([&threadPool] { threadPool.
Stop(); });
378 std::this_thread::yield();
386 }
catch (std::exception& e) {
393 stopper_thread.join();
402 std::counting_semaphore<> blocker(0);
406 std::atomic<int> counter{0};
407 const int num_tasks = 10;
408 std::vector<std::future<void>> futures;
409 futures.reserve(num_tasks);
410 for (
int i = 0; i < num_tasks; i++) {
411 futures.emplace_back(
Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
430 std::counting_semaphore<> blocker(0);
433 auto main_thread_id = std::this_thread::get_id();
434 std::atomic<int> main_thread_tasks{0};
435 const size_t num_tasks = 20;
436 for (
size_t i = 0; i < num_tasks; i++) {
437 (void)
Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
438 if (std::this_thread::get_id() == main_thread_id)
439 main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
445 std::thread unblocker([&blocker, &threadPool]() {
447 std::this_thread::yield();
463 constexpr int32_t num_tasks{50};
467 std::atomic_int32_t
sum{0};
468 const auto square{[&
sum](int32_t i) {
469 sum.fetch_add(i, std::memory_order_relaxed);
473 std::array<std::function<int32_t()>,
static_cast<size_t>(num_tasks)> array_tasks;
474 std::vector<std::function<int32_t()>> vector_tasks;
475 vector_tasks.reserve(
static_cast<size_t>(num_tasks));
476 for (
const auto i : std::views::iota(int32_t{1}, num_tasks + 1)) {
477 array_tasks.at(
static_cast<size_t>(i - 1)) = [i, square] {
return square(i); };
478 vector_tasks.emplace_back([i, square] {
return square(i); });
481 auto futures{std::move(*
Assert(threadPool.Submit(std::move(array_tasks))))};
483 std::ranges::move(*
Assert(threadPool.Submit(std::move(vector_tasks))), std::back_inserter(futures));
487 for (
auto& future : futures) {
488 squares_sum += future.get();
492 const auto expected_sum{2 * ((num_tasks * (num_tasks + 1)) / 2)};
493 const auto expected_squares_sum{2 * ((num_tasks * (num_tasks + 1) * ((num_tasks * 2) + 1)) / 6)};
#define Assert(val)
Identity function.
BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Fixed-size thread pool for running arbitrary tasks concurrently.
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
util::Expected< Future< F >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
int GetNumCores()
Return the number of cores available on the current system.
BOOST_FIXTURE_TEST_SUITE(cuckoocache_tests, BasicTestingSetup)
Test Suite for CuckooCache.
BOOST_AUTO_TEST_SUITE_END()
#define BOOST_CHECK_EQUAL(v1, v2)
#define BOOST_CHECK(expr)
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
#define WAIT_FOR(futures)
BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
constexpr char POOL_NAME[]
constexpr auto TEST_WAIT_TIMEOUT
auto Submit(ThreadPool &pool, F &&fn)
std::vector< std::future< void > > BlockWorkers(ThreadPool &threadPool, std::counting_semaphore<> &release_sem, size_t num_of_threads_to_block)
void UninterruptibleSleep(const std::chrono::microseconds &n)