12#include <boost/test/unit_test.hpp>
40#define WAIT_FOR(futures) \
42 for (const auto& f : futures) { \
43 BOOST_REQUIRE(f.wait_for(WAIT_TIMEOUT) == std::future_status::ready); \
49std::vector<std::future<void>>
BlockWorkers(
ThreadPool& threadPool,
const std::shared_future<void>& blocker_future,
int num_of_threads_to_block)
52 std::vector<std::promise<void>> ready_promises(num_of_threads_to_block);
53 std::vector<std::future<void>> ready_futures;
54 ready_futures.reserve(num_of_threads_to_block);
55 for (
auto& p : ready_promises) ready_futures.emplace_back(p.get_future());
58 std::vector<std::future<void>> blocking_tasks;
59 for (
int i = 0; i < num_of_threads_to_block; i++) {
60 std::promise<void>& ready = ready_promises[i];
61 blocking_tasks.emplace_back(threadPool.Submit([blocker_future, &ready]() {
63 blocker_future.wait();
69 return blocking_tasks;
76 BOOST_CHECK_EXCEPTION((
void)threadPool.Submit([]{ return false; }), std::runtime_error, [&](
const std::runtime_error& e) {
77 BOOST_CHECK_EQUAL(e.what(),
"No active workers; cannot accept new tasks");
89 std::atomic<int> counter = 0;
92 std::vector<std::future<void>> futures;
93 futures.reserve(num_tasks);
94 for (
int i = 1; i <= num_tasks; i++) {
95 futures.emplace_back(threadPool.Submit([&counter, i]() {
96 counter.fetch_add(i, std::memory_order_relaxed);
102 int expected_value = (num_tasks * (num_tasks + 1)) / 2;
113 std::promise<void> blocker;
114 std::shared_future<void> blocker_future(blocker.get_future());
123 std::vector<std::future<void>> futures(num_tasks);
124 for (
auto& f : futures) f = threadPool.Submit([&counter]{ counter++; });
140 std::atomic<bool> flag =
false;
141 std::future<void> future = threadPool.Submit([&flag]() {
143 flag.store(
true, std::memory_order_release);
154 std::future<bool> future_bool = threadPool.Submit([]() {
return true; });
157 std::future<std::string> future_str = threadPool.Submit([]() {
return std::string(
"true"); });
158 std::string result = future_str.get();
169 std::string err_msg{
"something wrong happened"};
170 std::vector<std::future<void>> futures;
171 futures.reserve(num_tasks);
172 for (
int i = 0; i < num_tasks; i++) {
173 futures.emplace_back(threadPool.Submit([err_msg, i]() {
174 throw std::runtime_error(err_msg + util::ToString(i));
178 for (
int i = 0; i < num_tasks; i++) {
179 BOOST_CHECK_EXCEPTION(futures.at(i).get(), std::runtime_error, [&](
const std::runtime_error& e) {
180 BOOST_CHECK_EQUAL(e.what(), err_msg + util::ToString(i));
192 std::promise<void> blocker;
193 std::shared_future<void> blocker_future(blocker.get_future());
198 std::atomic<int> counter = 0;
199 for (
int i = 0; i < num_tasks; i++) {
200 (void)threadPool.Submit([&counter]() {
201 counter.fetch_add(1, std::memory_order_relaxed);
208 for (
int i = 0; i < num_tasks; i++) {
224 std::promise<void> signal;
225 (void)threadPool.Submit([&]() {
226 (void)threadPool.Submit([&]() {
231 signal.get_future().wait();
241 std::promise<void> blocker;
242 std::shared_future<void> blocker_future(blocker.get_future());
246 std::future<bool> future = threadPool.Submit([]() {
return true; });
252 std::thread thread_unblocker([&blocker]() {
262 thread_unblocker.join();
278 std::atomic<int> counter{0};
280 std::vector<std::future<void>> futures;
281 futures.reserve(num_tasks);
282 for (
int i = 0; i < num_tasks; i++) {
283 futures.emplace_back(threadPool.Submit([&counter] {
284 counter.fetch_add(1, std::memory_order_relaxed);
299 BOOST_CHECK_EXCEPTION((
void)threadPool.Submit([]{}), std::runtime_error, [&](
const std::runtime_error& e) {
300 BOOST_CHECK_EQUAL(e.what(),
"No active workers; cannot accept new tasks");
310 std::atomic<int> counter{0};
311 std::promise<void> blocker;
312 const auto blocking_tasks =
BlockWorkers(threadPool, blocker.get_future().share(), 1);
313 threadPool.Submit([&threadPool, &counter]{
315 counter.fetch_add(1, std::memory_order_relaxed);
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Fixed-size thread pool for running arbitrary tasks concurrently.
void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Start worker threads.
void ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Execute a single queued task synchronously.
size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all worker threads and wait for them to exit.
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop accepting new tasks and begin asynchronous shutdown.
size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
int GetNumCores()
Return the number of cores available on the current system.
BOOST_FIXTURE_TEST_SUITE(cuckoocache_tests, BasicTestingSetup)
Test Suite for CuckooCache.
BOOST_AUTO_TEST_SUITE_END()
#define BOOST_CHECK_EQUAL(v1, v2)
#define BOOST_CHECK(expr)
std::vector< std::future< void > > BlockWorkers(ThreadPool &threadPool, const std::shared_future< void > &blocker_future, int num_of_threads_to_block)
#define WAIT_FOR(futures)
constexpr auto WAIT_TIMEOUT
constexpr char POOL_NAME[]
BOOST_AUTO_TEST_CASE(submit_task_before_start_fails)
void UninterruptibleSleep(const std::chrono::microseconds &n)