include/boost/capy/ex/thread_pool.hpp

100.0% Lines (10/10) 100.0% List of functions (4/4)
thread_pool.hpp
f(x) Functions (4)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/capy
9 //
10
11 #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12 #define BOOST_CAPY_EX_THREAD_POOL_HPP
13
14 #include <boost/capy/detail/config.hpp>
15 #include <boost/capy/continuation.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/execution_context.hpp>
18 #include <cstddef>
19 #include <string_view>
20
21 namespace boost {
22 namespace capy {
23
24 /** A pool of threads for executing work concurrently.
25
26 Use this when you need to run coroutines on multiple threads
27 without the overhead of creating and destroying threads for
28 each task. Work items are distributed across the pool using
29 a shared queue.
30
31 @par Thread Safety
32 Distinct objects: Safe.
33 Shared objects: Unsafe.
34
35 @par Example
36 @code
37 thread_pool pool(4); // 4 worker threads
38 auto ex = pool.get_executor();
39 ex.post(some_coroutine);
40 // pool destructor waits for all work to complete
41 @endcode
42 */
43 class BOOST_CAPY_DECL
44 thread_pool
45 : public execution_context
46 {
47 class impl;
48 impl* impl_;
49
50 public:
51 class executor_type;
52
53 /** Destroy the thread pool.
54
55 Signals all worker threads to stop, waits for them to
56 finish, and destroys any pending work items.
57 */
58 ~thread_pool();
59
60 /** Construct a thread pool.
61
62 Creates a pool with the specified number of worker threads.
63 If `num_threads` is zero, the number of threads is set to
64 the hardware concurrency, or one if that cannot be determined.
65
66 @param num_threads The number of worker threads, or zero
67 for automatic selection.
68
69 @param thread_name_prefix The prefix for worker thread names.
70 Thread names appear as "{prefix}0", "{prefix}1", etc.
71 The prefix is truncated to 12 characters. Defaults to
72 "capy-pool-".
73 */
74 explicit
75 thread_pool(
76 std::size_t num_threads = 0,
77 std::string_view thread_name_prefix = "capy-pool-");
78
79 thread_pool(thread_pool const&) = delete;
80 thread_pool& operator=(thread_pool const&) = delete;
81
82 /** Wait for all outstanding work to complete.
83
84 Releases the internal work guard, then blocks the calling
85 thread until all outstanding work tracked by
86 @ref executor_type::on_work_started and
87 @ref executor_type::on_work_finished completes. After all
88 work finishes, joins the worker threads.
89
90 If @ref stop is called while `join()` is blocking, the
91 pool stops without waiting for remaining work to
92 complete. Worker threads finish their current item and
93 exit; `join()` still waits for all threads to be joined
94 before returning.
95
96 This function is idempotent. The first call performs the
97 join; subsequent calls return immediately.
98
99 @par Preconditions
100 Must not be called from a thread in this pool (undefined
101 behavior).
102
103 @par Postconditions
104 All worker threads have been joined. The pool cannot be
105 reused.
106
107 @par Thread Safety
108 May be called from any thread not in this pool.
109 */
110 void
111 join() noexcept;
112
113 /** Request all worker threads to stop.
114
115 Signals all threads to exit after finishing their current
116 work item. Queued work that has not started is abandoned.
117 Does not wait for threads to exit.
118
119 If @ref join is blocking on another thread, calling
120 `stop()` causes it to stop waiting for outstanding
121 work. The `join()` call still waits for worker threads
122 to finish their current item and exit before returning.
123 */
124 void
125 stop() noexcept;
126
127 /** Return an executor for this thread pool.
128
129 @return An executor associated with this thread pool.
130 */
131 executor_type
132 get_executor() const noexcept;
133 };
134
135 /** An executor that submits work to a thread_pool.
136
137 Executors are lightweight handles that can be copied and stored.
138 All copies refer to the same underlying thread pool.
139
140 @par Thread Safety
141 Distinct objects: Safe.
142 Shared objects: Safe.
143 */
144 class thread_pool::executor_type
145 {
146 friend class thread_pool;
147
148 thread_pool* pool_ = nullptr;
149
150 explicit
151 163x executor_type(thread_pool& pool) noexcept
152 163x : pool_(&pool)
153 {
154 163x }
155
156 public:
157 /** Construct a default null executor.
158
159 The resulting executor is not associated with any pool.
160 `context()`, `dispatch()`, and `post()` require the
161 executor to be associated with a pool before use.
162 */
163 executor_type() = default;
164
165 /// Return the underlying thread pool.
166 thread_pool&
167 400x context() const noexcept
168 {
169 400x return *pool_;
170 }
171
172 /** Notify that work has started.
173
174 Increments the outstanding work count. Must be paired
175 with a subsequent call to @ref on_work_finished.
176
177 @see on_work_finished, work_guard
178 */
179 BOOST_CAPY_DECL
180 void
181 on_work_started() const noexcept;
182
183 /** Notify that work has finished.
184
185 Decrements the outstanding work count. When the count
186 reaches zero after @ref thread_pool::join has been called,
187 the pool's worker threads are signaled to stop.
188
189 @pre A preceding call to @ref on_work_started was made.
190
191 @see on_work_started, work_guard
192 */
193 BOOST_CAPY_DECL
194 void
195 on_work_finished() const noexcept;
196
197 /** Dispatch a continuation for execution.
198
199 Posts the continuation to the thread pool for execution on a
200 worker thread and returns `std::noop_coroutine()`. Thread
201 pools never execute inline because no single thread "owns"
202 the pool.
203
204 @param c The continuation to execute. Must remain at a
205 stable address until dequeued and resumed.
206
207 @return `std::noop_coroutine()` always.
208 */
209 std::coroutine_handle<>
210 352x dispatch(continuation& c) const
211 {
212 352x post(c);
213 352x return std::noop_coroutine();
214 }
215
216 /** Post a continuation to the thread pool.
217
218 The continuation will be resumed on one of the pool's
219 worker threads. The continuation must remain at a stable
220 address until it is dequeued and resumed.
221
222 @param c The continuation to execute.
223 */
224 BOOST_CAPY_DECL
225 void
226 post(continuation& c) const;
227
228 /// Return true if two executors refer to the same thread pool.
229 bool
230 13x operator==(executor_type const& other) const noexcept
231 {
232 13x return pool_ == other.pool_;
233 }
234 };
235
236 } // capy
237 } // boost
238
239 #endif
240