src/ex/thread_pool.cpp

100.0% Lines (126/126) 100.0% List of functions (25/25)
thread_pool.cpp
f(x) Functions (25)
Function Calls Lines Blocks
boost::capy::thread_pool::impl::push(boost::capy::continuation*) :58 828x 100.0% 100.0% boost::capy::thread_pool::impl::pop() :68 985x 100.0% 100.0% boost::capy::thread_pool::impl::empty() const :79 1064x 100.0% 100.0% boost::capy::thread_pool::impl::~impl() :95 157x 100.0% 100.0% boost::capy::thread_pool::impl::drain_abandoned() :103 157x 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :113 157x 100.0% 80.0% boost::capy::thread_pool::impl::post(boost::capy::continuation&) :126 828x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :137 345x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_finished() :143 345x 100.0% 100.0% boost::capy::thread_pool::impl::join() :156 168x 100.0% 85.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :172 59x 100.0% 100.0% boost::capy::thread_pool::impl::stop() :184 159x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :195 828x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :197 101x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :200 179x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :205 179x 100.0% 84.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :217 1064x 100.0% 100.0% boost::capy::thread_pool::~thread_pool() :233 157x 100.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :244 157x 100.0% 55.0% boost::capy::thread_pool::join() :252 11x 100.0% 100.0% boost::capy::thread_pool::stop() :259 2x 100.0% 100.0% boost::capy::thread_pool::get_executor() const :268 163x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :276 345x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :283 345x 100.0% 100.0% boost::capy::thread_pool::executor_type::post(boost::capy::continuation&) const :290 828x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are continuations linked via their intrusive next pointer,
26 stored in a single queue protected by a mutex. No per-post heap
27 allocation: the continuation is owned by the caller and linked
28 directly. Worker threads wait on a condition_variable until work
29 is available or stop is requested.
30
31 Threads are started lazily on first post() via std::call_once to avoid
32 spawning threads for pools that are constructed but never used. Each
33 thread is named with a configurable prefix plus index for debugger
34 visibility.
35
36 Work tracking: on_work_started/on_work_finished maintain an atomic
37 outstanding_work_ counter. join() blocks until this counter reaches
38 zero, then signals workers to stop and joins threads.
39
40 Two shutdown paths:
41 - join(): waits for outstanding work to drain, then stops workers.
42 - stop(): immediately signals workers to exit; queued work is abandoned.
43 - Destructor: stop() then join() (abandon + wait for threads).
44 */
45
46 namespace boost {
47 namespace capy {
48
49 //------------------------------------------------------------------------------
50
51 class thread_pool::impl
52 {
53 // Intrusive queue of continuations via continuation::next.
54 // No per-post allocation: the continuation is owned by the caller.
55 continuation* head_ = nullptr;
56 continuation* tail_ = nullptr;
57
58 828x void push(continuation* c) noexcept
59 {
60 828x c->next = nullptr;
61 828x if(tail_)
62 602x tail_->next = c;
63 else
64 226x head_ = c;
65 828x tail_ = c;
66 828x }
67
68 985x continuation* pop() noexcept
69 {
70 985x if(!head_)
71 157x return nullptr;
72 828x continuation* c = head_;
73 828x head_ = head_->next;
74 828x if(!head_)
75 226x tail_ = nullptr;
76 828x return c;
77 }
78
79 1064x bool empty() const noexcept
80 {
81 1064x return head_ == nullptr;
82 }
83
84 std::mutex mutex_;
85 std::condition_variable cv_;
86 std::vector<std::thread> threads_;
87 std::atomic<std::size_t> outstanding_work_{0};
88 bool stop_{false};
89 bool joined_{false};
90 std::size_t num_threads_;
91 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
92 std::once_flag start_flag_;
93
94 public:
95 157x ~impl() = default;
96
97 // Destroy abandoned coroutine frames. Must be called
98 // before execution_context::shutdown()/destroy() so
99 // that suspended-frame destructors (e.g. delay_awaitable
100 // calling timer_service::cancel()) run while services
101 // are still valid.
102 void
103 157x drain_abandoned() noexcept
104 {
105 345x while(auto* c = pop())
106 {
107 188x auto h = c->h;
108 188x if(h && h != std::noop_coroutine())
109 136x h.destroy();
110 188x }
111 157x }
112
113 157x impl(std::size_t num_threads, std::string_view thread_name_prefix)
114 157x : num_threads_(num_threads)
115 {
116 157x if(num_threads_ == 0)
117 4x num_threads_ = std::max(
118 2x std::thread::hardware_concurrency(), 1u);
119
120 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
121 157x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
122 157x thread_name_prefix_[n] = '\0';
123 157x }
124
125 void
126 828x post(continuation& c)
127 {
128 828x ensure_started();
129 {
130 828x std::lock_guard<std::mutex> lock(mutex_);
131 828x push(&c);
132 828x }
133 828x cv_.notify_one();
134 828x }
135
136 void
137 345x on_work_started() noexcept
138 {
139 345x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
140 345x }
141
142 void
143 345x on_work_finished() noexcept
144 {
145 345x if(outstanding_work_.fetch_sub(
146 345x 1, std::memory_order_acq_rel) == 1)
147 {
148 85x std::lock_guard<std::mutex> lock(mutex_);
149 85x if(joined_ && !stop_)
150 4x stop_ = true;
151 85x cv_.notify_all();
152 85x }
153 345x }
154
155 void
156 168x join() noexcept
157 {
158 {
159 168x std::unique_lock<std::mutex> lock(mutex_);
160 168x if(joined_)
161 11x return;
162 157x joined_ = true;
163
164 157x if(outstanding_work_.load(
165 157x std::memory_order_acquire) == 0)
166 {
167 103x stop_ = true;
168 103x cv_.notify_all();
169 }
170 else
171 {
172 54x cv_.wait(lock, [this]{
173 59x return stop_;
174 });
175 }
176 168x }
177
178 336x for(auto& t : threads_)
179 179x if(t.joinable())
180 179x t.join();
181 }
182
183 void
184 159x stop() noexcept
185 {
186 {
187 159x std::lock_guard<std::mutex> lock(mutex_);
188 159x stop_ = true;
189 159x }
190 159x cv_.notify_all();
191 159x }
192
193 private:
194 void
195 828x ensure_started()
196 {
197 828x std::call_once(start_flag_, [this]{
198 101x threads_.reserve(num_threads_);
199 280x for(std::size_t i = 0; i < num_threads_; ++i)
200 358x threads_.emplace_back([this, i]{ run(i); });
201 101x });
202 828x }
203
204 void
205 179x run(std::size_t index)
206 {
207 // Build name; set_current_thread_name truncates to platform limits.
208 char name[16];
209 179x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
210 179x set_current_thread_name(name);
211
212 for(;;)
213 {
214 819x continuation* c = nullptr;
215 {
216 819x std::unique_lock<std::mutex> lock(mutex_);
217 819x cv_.wait(lock, [this]{
218 1396x return !empty() ||
219 1396x stop_;
220 });
221 819x if(stop_)
222 358x return;
223 640x c = pop();
224 819x }
225 640x if(c)
226 640x c->h.resume();
227 640x }
228 }
229 };
230
231 //------------------------------------------------------------------------------
232
233 157x thread_pool::
234 ~thread_pool()
235 {
236 157x impl_->stop();
237 157x impl_->join();
238 157x impl_->drain_abandoned();
239 157x shutdown();
240 157x destroy();
241 157x delete impl_;
242 157x }
243
244 157x thread_pool::
245 157x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
246 157x : impl_(new impl(num_threads, thread_name_prefix))
247 {
248 157x this->set_frame_allocator(std::allocator<void>{});
249 157x }
250
251 void
252 11x thread_pool::
253 join() noexcept
254 {
255 11x impl_->join();
256 11x }
257
258 void
259 2x thread_pool::
260 stop() noexcept
261 {
262 2x impl_->stop();
263 2x }
264
265 //------------------------------------------------------------------------------
266
267 thread_pool::executor_type
268 163x thread_pool::
269 get_executor() const noexcept
270 {
271 163x return executor_type(
272 163x const_cast<thread_pool&>(*this));
273 }
274
275 void
276 345x thread_pool::executor_type::
277 on_work_started() const noexcept
278 {
279 345x pool_->impl_->on_work_started();
280 345x }
281
282 void
283 345x thread_pool::executor_type::
284 on_work_finished() const noexcept
285 {
286 345x pool_->impl_->on_work_finished();
287 345x }
288
289 void
290 828x thread_pool::executor_type::
291 post(continuation& c) const
292 {
293 828x pool_->impl_->post(c);
294 828x }
295
296 } // capy
297 } // boost
298