TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <algorithm>
15 : #include <atomic>
16 : #include <condition_variable>
17 : #include <cstdio>
18 : #include <mutex>
19 : #include <thread>
20 : #include <vector>
21 :
22 : /*
23 : Thread pool implementation using a shared work queue.
24 :
25 : Work items are continuations linked via their intrusive next pointer,
26 : stored in a single queue protected by a mutex. No per-post heap
27 : allocation: the continuation is owned by the caller and linked
28 : directly. Worker threads wait on a condition_variable until work
29 : is available or stop is requested.
30 :
31 : Threads are started lazily on first post() via std::call_once to avoid
32 : spawning threads for pools that are constructed but never used. Each
33 : thread is named with a configurable prefix plus index for debugger
34 : visibility.
35 :
36 : Work tracking: on_work_started/on_work_finished maintain an atomic
37 : outstanding_work_ counter. join() blocks until this counter reaches
38 : zero, then signals workers to stop and joins threads.
39 :
40 : Two shutdown paths:
41 : - join(): waits for outstanding work to drain, then stops workers.
42 : - stop(): immediately signals workers to exit; queued work is abandoned.
43 : - Destructor: stop() then join() (abandon + wait for threads).
44 : */
45 :
46 : namespace boost {
47 : namespace capy {
48 :
49 : //------------------------------------------------------------------------------
50 :
51 : class thread_pool::impl
52 : {
53 : // Intrusive queue of continuations via continuation::next.
54 : // No per-post allocation: the continuation is owned by the caller.
55 : continuation* head_ = nullptr;
56 : continuation* tail_ = nullptr;
57 :
58 HIT 828 : void push(continuation* c) noexcept
59 : {
60 828 : c->next = nullptr;
61 828 : if(tail_)
62 602 : tail_->next = c;
63 : else
64 226 : head_ = c;
65 828 : tail_ = c;
66 828 : }
67 :
68 985 : continuation* pop() noexcept
69 : {
70 985 : if(!head_)
71 157 : return nullptr;
72 828 : continuation* c = head_;
73 828 : head_ = head_->next;
74 828 : if(!head_)
75 226 : tail_ = nullptr;
76 828 : return c;
77 : }
78 :
79 1064 : bool empty() const noexcept
80 : {
81 1064 : return head_ == nullptr;
82 : }
83 :
84 : std::mutex mutex_;
85 : std::condition_variable cv_;
86 : std::vector<std::thread> threads_;
87 : std::atomic<std::size_t> outstanding_work_{0};
88 : bool stop_{false};
89 : bool joined_{false};
90 : std::size_t num_threads_;
91 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
92 : std::once_flag start_flag_;
93 :
94 : public:
95 157 : ~impl() = default;
96 :
97 : // Destroy abandoned coroutine frames. Must be called
98 : // before execution_context::shutdown()/destroy() so
99 : // that suspended-frame destructors (e.g. delay_awaitable
100 : // calling timer_service::cancel()) run while services
101 : // are still valid.
102 : void
103 157 : drain_abandoned() noexcept
104 : {
105 345 : while(auto* c = pop())
106 : {
107 188 : auto h = c->h;
108 188 : if(h && h != std::noop_coroutine())
109 136 : h.destroy();
110 188 : }
111 157 : }
112 :
113 157 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
114 157 : : num_threads_(num_threads)
115 : {
116 157 : if(num_threads_ == 0)
117 4 : num_threads_ = std::max(
118 2 : std::thread::hardware_concurrency(), 1u);
119 :
120 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
121 157 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
122 157 : thread_name_prefix_[n] = '\0';
123 157 : }
124 :
125 : void
126 828 : post(continuation& c)
127 : {
128 828 : ensure_started();
129 : {
130 828 : std::lock_guard<std::mutex> lock(mutex_);
131 828 : push(&c);
132 828 : }
133 828 : cv_.notify_one();
134 828 : }
135 :
136 : void
137 345 : on_work_started() noexcept
138 : {
139 345 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
140 345 : }
141 :
142 : void
143 345 : on_work_finished() noexcept
144 : {
145 345 : if(outstanding_work_.fetch_sub(
146 345 : 1, std::memory_order_acq_rel) == 1)
147 : {
148 85 : std::lock_guard<std::mutex> lock(mutex_);
149 85 : if(joined_ && !stop_)
150 4 : stop_ = true;
151 85 : cv_.notify_all();
152 85 : }
153 345 : }
154 :
155 : void
156 168 : join() noexcept
157 : {
158 : {
159 168 : std::unique_lock<std::mutex> lock(mutex_);
160 168 : if(joined_)
161 11 : return;
162 157 : joined_ = true;
163 :
164 157 : if(outstanding_work_.load(
165 157 : std::memory_order_acquire) == 0)
166 : {
167 103 : stop_ = true;
168 103 : cv_.notify_all();
169 : }
170 : else
171 : {
172 54 : cv_.wait(lock, [this]{
173 59 : return stop_;
174 : });
175 : }
176 168 : }
177 :
178 336 : for(auto& t : threads_)
179 179 : if(t.joinable())
180 179 : t.join();
181 : }
182 :
183 : void
184 159 : stop() noexcept
185 : {
186 : {
187 159 : std::lock_guard<std::mutex> lock(mutex_);
188 159 : stop_ = true;
189 159 : }
190 159 : cv_.notify_all();
191 159 : }
192 :
193 : private:
194 : void
195 828 : ensure_started()
196 : {
197 828 : std::call_once(start_flag_, [this]{
198 101 : threads_.reserve(num_threads_);
199 280 : for(std::size_t i = 0; i < num_threads_; ++i)
200 358 : threads_.emplace_back([this, i]{ run(i); });
201 101 : });
202 828 : }
203 :
204 : void
205 179 : run(std::size_t index)
206 : {
207 : // Build name; set_current_thread_name truncates to platform limits.
208 : char name[16];
209 179 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
210 179 : set_current_thread_name(name);
211 :
212 : for(;;)
213 : {
214 819 : continuation* c = nullptr;
215 : {
216 819 : std::unique_lock<std::mutex> lock(mutex_);
217 819 : cv_.wait(lock, [this]{
218 1396 : return !empty() ||
219 1396 : stop_;
220 : });
221 819 : if(stop_)
222 358 : return;
223 640 : c = pop();
224 819 : }
225 640 : if(c)
226 640 : c->h.resume();
227 640 : }
228 : }
229 : };
230 :
231 : //------------------------------------------------------------------------------
232 :
233 157 : thread_pool::
234 : ~thread_pool()
235 : {
236 157 : impl_->stop();
237 157 : impl_->join();
238 157 : impl_->drain_abandoned();
239 157 : shutdown();
240 157 : destroy();
241 157 : delete impl_;
242 157 : }
243 :
244 157 : thread_pool::
245 157 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
246 157 : : impl_(new impl(num_threads, thread_name_prefix))
247 : {
248 157 : this->set_frame_allocator(std::allocator<void>{});
249 157 : }
250 :
251 : void
252 11 : thread_pool::
253 : join() noexcept
254 : {
255 11 : impl_->join();
256 11 : }
257 :
258 : void
259 2 : thread_pool::
260 : stop() noexcept
261 : {
262 2 : impl_->stop();
263 2 : }
264 :
265 : //------------------------------------------------------------------------------
266 :
267 : thread_pool::executor_type
268 163 : thread_pool::
269 : get_executor() const noexcept
270 : {
271 163 : return executor_type(
272 163 : const_cast<thread_pool&>(*this));
273 : }
274 :
275 : void
276 345 : thread_pool::executor_type::
277 : on_work_started() const noexcept
278 : {
279 345 : pool_->impl_->on_work_started();
280 345 : }
281 :
282 : void
283 345 : thread_pool::executor_type::
284 : on_work_finished() const noexcept
285 : {
286 345 : pool_->impl_->on_work_finished();
287 345 : }
288 :
289 : void
290 828 : thread_pool::executor_type::
291 : post(continuation& c) const
292 : {
293 828 : pool_->impl_->post(c);
294 828 : }
295 :
296 : } // capy
297 : } // boost
|