13#include <boost/test/unit_test.hpp>
46#define WAIT_FOR(futures) \
48 for (const auto& f : futures) { \
49 BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
57 return std::move(*
Assert(pool.
Submit(std::forward<F>(fn))));
62std::vector<std::future<void>>
BlockWorkers(
ThreadPool& threadPool, std::counting_semaphore<>& release_sem,
size_t num_of_threads_to_block)
65 std::latch ready{
static_cast<std::ptrdiff_t
>(num_of_threads_to_block)};
66 std::vector<std::future<void>> blocking_tasks(num_of_threads_to_block);
67 for (
auto& f : blocking_tasks) f =
Submit(threadPool, [&] {
69 release_sem.acquire();
72 return blocking_tasks;
79 const auto fn_empty = [&] {};
82 auto res = threadPool.
Submit(fn_empty);
87 std::counting_semaphore<> blocker(0);
91 res = threadPool.
Submit(fn_empty);
100 res = threadPool.
Submit(fn_empty);
107 res = threadPool.
Submit(fn_empty);
119 std::atomic<int> counter = 0;
122 std::vector<std::future<void>> futures;
123 futures.reserve(num_tasks);
124 for (
int i = 1; i <= num_tasks; i++) {
125 futures.emplace_back(
Submit(threadPool, [&counter, i]() {
126 counter.fetch_add(i, std::memory_order_relaxed);
132 int expected_value = (num_tasks * (num_tasks + 1)) / 2;
142 std::counting_semaphore<> blocker(0);
151 std::vector<std::future<void>> futures(num_tasks);
152 for (
auto& f : futures) f =
Submit(threadPool, [&counter]{ counter++; });
168 std::atomic<bool> flag =
false;
169 std::future<void> future =
Submit(threadPool, [&flag]() {
171 flag.store(
true, std::memory_order_release);
182 std::future<bool> future_bool =
Submit(threadPool, []() {
return true; });
185 std::future<std::string> future_str =
Submit(threadPool, []() {
return std::string(
"true"); });
186 std::string result = future_str.get();
196 const auto make_err{[&](
size_t n) {
return strprintf(
"error on thread #%s", n); }};
198 const int num_tasks = 5;
199 std::vector<std::future<void>> futures;
200 futures.reserve(num_tasks);
201 for (
int i = 0; i < num_tasks; i++) {
202 futures.emplace_back(
Submit(threadPool, [&make_err, i] {
throw std::runtime_error(make_err(i)); }));
205 for (
int i = 0; i < num_tasks; i++) {
206 BOOST_CHECK_EXCEPTION(futures[i].get(), std::runtime_error,
HasReason{make_err(i)});
216 std::counting_semaphore<> blocker(0);
221 std::atomic<int> counter = 0;
222 for (
int i = 0; i < num_tasks; i++) {
223 (void)
Submit(threadPool, [&counter]() {
224 counter.fetch_add(1, std::memory_order_relaxed);
231 for (
int i = 0; i < num_tasks; i++) {
247 std::promise<void> signal;
248 (void)
Submit(threadPool, [&]() {
249 (void)
Submit(threadPool, [&]() {
254 signal.get_future().wait();
264 std::counting_semaphore<> blocker(0);
268 std::future<bool> future =
Submit(threadPool, []() {
return true; });
274 std::thread thread_unblocker([&blocker]() {
284 thread_unblocker.join();
300 std::atomic<int> counter{0};
302 std::vector<std::future<void>> futures;
303 futures.reserve(num_tasks);
304 for (
int i = 0; i < num_tasks; i++) {
305 futures.emplace_back(
Submit(threadPool, [&counter] {
306 counter.fetch_add(1, std::memory_order_relaxed);
322 auto res = threadPool.
Submit([]{});
332 std::atomic<int> counter{0};
333 std::counting_semaphore<> blocker(0);
334 const auto blocking_tasks =
BlockWorkers(threadPool, blocker, 1);
335 Submit(threadPool, [&threadPool, &counter]{
337 counter.fetch_add(1, std::memory_order_relaxed);
354 std::counting_semaphore<> workers_blocker(0);
357 std::thread stopper_thread([&threadPool] { threadPool.
Stop(); });
363 std::this_thread::yield();
371 }
catch (std::exception& e) {
378 stopper_thread.join();
387 std::counting_semaphore<> blocker(0);
391 std::atomic<int> counter{0};
392 const int num_tasks = 10;
393 std::vector<std::future<void>> futures;
394 futures.reserve(num_tasks);
395 for (
int i = 0; i < num_tasks; i++) {
396 futures.emplace_back(
Submit(threadPool, [&counter]{ counter.fetch_add(1, std::memory_order_relaxed); }));
415 std::counting_semaphore<> blocker(0);
418 auto main_thread_id = std::this_thread::get_id();
419 std::atomic<int> main_thread_tasks{0};
420 const size_t num_tasks = 20;
421 for (
size_t i = 0; i < num_tasks; i++) {
422 (void)
Submit(threadPool, [&main_thread_tasks, main_thread_id]() {
423 if (std::this_thread::get_id() == main_thread_id)
424 main_thread_tasks.fetch_add(1, std::memory_order_relaxed);
430 std::thread unblocker([&blocker, &threadPool]() {
432 std::this_thread::yield();
#define Assert(val)
Identity function.
BOOST_CHECK_EXCEPTION predicates to check the specific validation error.
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Fixed-size thread pool for running arbitrary tasks concurrently.
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
util::Expected< std::future< std::invoke_result_t< F > >, SubmitError > Submit(F &&fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Enqueues a new task for asynchronous execution.
int GetNumCores()
Return the number of cores available on the current system.
BOOST_FIXTURE_TEST_SUITE(cuckoocache_tests, BasicTestingSetup)
Test Suite for CuckooCache.
BOOST_AUTO_TEST_SUITE_END()
#define BOOST_CHECK_EQUAL(v1, v2)
#define BOOST_CHECK(expr)
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept
#define WAIT_FOR(futures)
BOOST_AUTO_TEST_CASE(submit_fails_with_correct_error)
constexpr auto WAIT_TIMEOUT
constexpr char POOL_NAME[]
auto Submit(ThreadPool &pool, F &&fn)
std::vector< std::future< void > > BlockWorkers(ThreadPool &threadPool, std::counting_semaphore<> &release_sem, size_t num_of_threads_to_block)
void UninterruptibleSleep(const std::chrono::microseconds &n)