| // This file is part of Eigen, a lightweight C++ template library |
| // for linear algebra. |
| // |
| // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> |
| // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com> |
| // |
| // This Source Code Form is subject to the terms of the Mozilla |
| // Public License v. 2.0. If a copy of the MPL was not distributed |
| // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| |
| #define EIGEN_USE_THREADS |
| #include "main.h" |
| #include "Eigen/ThreadPool" |
| |
| static void test_create_destroy_empty_pool() { |
| // Just create and destroy the pool. This will wind up and tear down worker |
| // threads. Ensure there are no issues in that logic. |
| for (int i = 0; i < 16; ++i) { |
| ThreadPool tp(i); |
| } |
| } |
| |
| static void test_parallelism(bool allow_spinning) { |
| // Test we never-ever fail to match available tasks with idle threads. |
| const int kThreads = 16; // code below expects that this is a multiple of 4 |
| ThreadPool tp(kThreads, allow_spinning); |
| VERIFY_IS_EQUAL(tp.NumThreads(), kThreads); |
| VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1); |
| for (int iter = 0; iter < 100; ++iter) { |
| std::atomic<int> running(0); |
| std::atomic<int> done(0); |
| std::atomic<int> phase(0); |
| // Schedule kThreads tasks and ensure that they all are running. |
| for (int i = 0; i < kThreads; ++i) { |
| tp.Schedule([&]() { |
| const int thread_id = tp.CurrentThreadId(); |
| VERIFY_GE(thread_id, 0); |
| VERIFY_LE(thread_id, kThreads - 1); |
| running++; |
| while (phase < 1) { |
| } |
| done++; |
| }); |
| } |
| while (running != kThreads) { |
| } |
| running = 0; |
| phase = 1; |
| // Now, while the previous tasks exit, schedule another kThreads tasks and |
| // ensure that they are running. |
| for (int i = 0; i < kThreads; ++i) { |
| tp.Schedule([&, i]() { |
| running++; |
| while (phase < 2) { |
| } |
| // When all tasks are running, half of tasks exit, quarter of tasks |
| // continue running and quarter of tasks schedule another 2 tasks each. |
| // Concurrently main thread schedules another quarter of tasks. |
| // This gives us another kThreads tasks and we ensure that they all |
| // are running. |
| if (i < kThreads / 2) { |
| } else if (i < 3 * kThreads / 4) { |
| running++; |
| while (phase < 3) { |
| } |
| done++; |
| } else { |
| for (int j = 0; j < 2; ++j) { |
| tp.Schedule([&]() { |
| running++; |
| while (phase < 3) { |
| } |
| done++; |
| }); |
| } |
| } |
| done++; |
| }); |
| } |
| while (running != kThreads) { |
| } |
| running = 0; |
| phase = 2; |
| for (int i = 0; i < kThreads / 4; ++i) { |
| tp.Schedule([&]() { |
| running++; |
| while (phase < 3) { |
| } |
| done++; |
| }); |
| } |
| while (running != kThreads) { |
| } |
| phase = 3; |
| while (done != 3 * kThreads) { |
| } |
| } |
| } |
| |
| static void test_cancel() { |
| ThreadPool tp(2); |
| |
| // Schedule a large number of closure that each sleeps for one second. This |
| // will keep the thread pool busy for much longer than the default test timeout. |
| for (int i = 0; i < 1000; ++i) { |
| tp.Schedule([]() { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); }); |
| } |
| |
| // Cancel the processing of all the closures that are still pending. |
| tp.Cancel(); |
| } |
| |
| static void test_pool_partitions() { |
| const int kThreads = 2; |
| |
| std::atomic<int> running(0); |
| std::atomic<int> done(0); |
| std::atomic<int> phase(0); |
| |
| { |
| ThreadPool tp(kThreads); |
| |
| // Assign each thread to its own partition, so that stealing other work only |
| // occurs globally when a thread is idle. |
| std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads); |
| for (int i = 0; i < kThreads; ++i) { |
| steal_partitions[i] = std::make_pair(i, i + 1); |
| } |
| tp.SetStealPartitions(steal_partitions); |
| |
| // Schedule kThreads tasks and ensure that they all are running. |
| for (int i = 0; i < kThreads; ++i) { |
| tp.Schedule([&]() { |
| const int thread_id = tp.CurrentThreadId(); |
| VERIFY_GE(thread_id, 0); |
| VERIFY_LE(thread_id, kThreads - 1); |
| ++running; |
| while (phase < 1) { |
| } |
| ++done; |
| }); |
| } |
| while (running != kThreads) { |
| } |
| // Schedule each closure to only run on thread 'i' and verify that it does. |
| for (int i = 0; i < kThreads; ++i) { |
| tp.ScheduleWithHint( |
| [&, i]() { |
| ++running; |
| const int thread_id = tp.CurrentThreadId(); |
| VERIFY_IS_EQUAL(thread_id, i); |
| while (phase < 2) { |
| } |
| ++done; |
| }, |
| i, i + 1); |
| } |
| running = 0; |
| phase = 1; |
| while (running != kThreads) { |
| } |
| running = 0; |
| phase = 2; |
| } |
| } |
| |
| EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool) { |
| CALL_SUBTEST(test_create_destroy_empty_pool()); |
| CALL_SUBTEST(test_parallelism(true)); |
| CALL_SUBTEST(test_parallelism(false)); |
| CALL_SUBTEST(test_cancel()); |
| CALL_SUBTEST(test_pool_partitions()); |
| } |