TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12 : #define BOOST_CAPY_EX_THREAD_POOL_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/ex/execution_context.hpp>
18 : #include <cstddef>
19 : #include <string_view>
20 :
21 : namespace boost {
22 : namespace capy {
23 :
24 : /** A pool of threads for executing work concurrently.
25 :
26 : Use this when you need to run coroutines on multiple threads
27 : without the overhead of creating and destroying threads for
28 : each task. Work items are distributed across the pool using
29 : a shared queue.
30 :
31 : @par Thread Safety
32 : Distinct objects: Safe.
33 : Shared objects: Unsafe.
34 :
35 : @par Example
36 : @code
37 : thread_pool pool(4); // 4 worker threads
38 : auto ex = pool.get_executor();
39 : ex.post(some_coroutine);
40 : // pool destructor waits for all work to complete
41 : @endcode
42 : */
43 : class BOOST_CAPY_DECL
44 : thread_pool
45 : : public execution_context
46 : {
47 : class impl;
48 : impl* impl_;
49 :
50 : public:
51 : class executor_type;
52 :
53 : /** Destroy the thread pool.
54 :
55 : Signals all worker threads to stop, waits for them to
56 : finish, and destroys any pending work items.
57 : */
58 : ~thread_pool();
59 :
60 : /** Construct a thread pool.
61 :
62 : Creates a pool with the specified number of worker threads.
63 : If `num_threads` is zero, the number of threads is set to
64 : the hardware concurrency, or one if that cannot be determined.
65 :
66 : @param num_threads The number of worker threads, or zero
67 : for automatic selection.
68 :
69 : @param thread_name_prefix The prefix for worker thread names.
70 : Thread names appear as "{prefix}0", "{prefix}1", etc.
71 : The prefix is truncated to 12 characters. Defaults to
72 : "capy-pool-".
73 : */
74 : explicit
75 : thread_pool(
76 : std::size_t num_threads = 0,
77 : std::string_view thread_name_prefix = "capy-pool-");
78 :
79 : thread_pool(thread_pool const&) = delete;
80 : thread_pool& operator=(thread_pool const&) = delete;
81 :
82 : /** Wait for all outstanding work to complete.
83 :
84 : Releases the internal work guard, then blocks the calling
85 : thread until all outstanding work tracked by
86 : @ref executor_type::on_work_started and
87 : @ref executor_type::on_work_finished completes. After all
88 : work finishes, joins the worker threads.
89 :
90 : If @ref stop is called while `join()` is blocking, the
91 : pool stops without waiting for remaining work to
92 : complete. Worker threads finish their current item and
93 : exit; `join()` still waits for all threads to be joined
94 : before returning.
95 :
96 : This function is idempotent. The first call performs the
97 : join; subsequent calls return immediately.
98 :
99 : @par Preconditions
100 : Must not be called from a thread in this pool (undefined
101 : behavior).
102 :
103 : @par Postconditions
104 : All worker threads have been joined. The pool cannot be
105 : reused.
106 :
107 : @par Thread Safety
108 : May be called from any thread not in this pool.
109 : */
110 : void
111 : join() noexcept;
112 :
113 : /** Request all worker threads to stop.
114 :
115 : Signals all threads to exit after finishing their current
116 : work item. Queued work that has not started is abandoned.
117 : Does not wait for threads to exit.
118 :
119 : If @ref join is blocking on another thread, calling
120 : `stop()` causes it to stop waiting for outstanding
121 : work. The `join()` call still waits for worker threads
122 : to finish their current item and exit before returning.
123 : */
124 : void
125 : stop() noexcept;
126 :
127 : /** Return an executor for this thread pool.
128 :
129 : @return An executor associated with this thread pool.
130 : */
131 : executor_type
132 : get_executor() const noexcept;
133 : };
134 :
135 : /** An executor that submits work to a thread_pool.
136 :
137 : Executors are lightweight handles that can be copied and stored.
138 : All copies refer to the same underlying thread pool.
139 :
140 : @par Thread Safety
141 : Distinct objects: Safe.
142 : Shared objects: Safe.
143 : */
144 : class thread_pool::executor_type
145 : {
146 : friend class thread_pool;
147 :
148 : thread_pool* pool_ = nullptr;
149 :
150 : explicit
151 HIT 163 : executor_type(thread_pool& pool) noexcept
152 163 : : pool_(&pool)
153 : {
154 163 : }
155 :
156 : public:
157 : /** Construct a default null executor.
158 :
159 : The resulting executor is not associated with any pool.
160 : `context()`, `dispatch()`, and `post()` require the
161 : executor to be associated with a pool before use.
162 : */
163 : executor_type() = default;
164 :
165 : /// Return the underlying thread pool.
166 : thread_pool&
167 400 : context() const noexcept
168 : {
169 400 : return *pool_;
170 : }
171 :
172 : /** Notify that work has started.
173 :
174 : Increments the outstanding work count. Must be paired
175 : with a subsequent call to @ref on_work_finished.
176 :
177 : @see on_work_finished, work_guard
178 : */
179 : BOOST_CAPY_DECL
180 : void
181 : on_work_started() const noexcept;
182 :
183 : /** Notify that work has finished.
184 :
185 : Decrements the outstanding work count. When the count
186 : reaches zero after @ref thread_pool::join has been called,
187 : the pool's worker threads are signaled to stop.
188 :
189 : @pre A preceding call to @ref on_work_started was made.
190 :
191 : @see on_work_started, work_guard
192 : */
193 : BOOST_CAPY_DECL
194 : void
195 : on_work_finished() const noexcept;
196 :
197 : /** Dispatch a continuation for execution.
198 :
199 : Posts the continuation to the thread pool for execution on a
200 : worker thread and returns `std::noop_coroutine()`. Thread
201 : pools never execute inline because no single thread "owns"
202 : the pool.
203 :
204 : @param c The continuation to execute. Must remain at a
205 : stable address until dequeued and resumed.
206 :
207 : @return `std::noop_coroutine()` always.
208 : */
209 : std::coroutine_handle<>
210 352 : dispatch(continuation& c) const
211 : {
212 352 : post(c);
213 352 : return std::noop_coroutine();
214 : }
215 :
216 : /** Post a continuation to the thread pool.
217 :
218 : The continuation will be resumed on one of the pool's
219 : worker threads. The continuation must remain at a stable
220 : address until it is dequeued and resumed.
221 :
222 : @param c The continuation to execute.
223 : */
224 : BOOST_CAPY_DECL
225 : void
226 : post(continuation& c) const;
227 :
228 : /// Return true if two executors refer to the same thread pool.
229 : bool
230 13 : operator==(executor_type const& other) const noexcept
231 : {
232 13 : return pool_ == other.pool_;
233 : }
234 : };
235 :
236 : } // capy
237 : } // boost
238 :
239 : #endif
|