src/ex/detail/strand_service.cpp

97.9% Lines (95/97) 95.7% List of functions (22/23)
strand_service.cpp
f(x) Functions (23)
Function Calls Lines Blocks
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, boost::capy::detail::strand_impl&) :59 12x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long) :76 12x 87.5% 75.0% boost::capy::detail::strand_invoker::promise_type::get_return_object() :90 12x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::initial_suspend() :93 12x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::final_suspend() :94 12x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::return_void() :95 12x 100.0% 100.0% boost::capy::detail::strand_invoker::promise_type::unhandled_exception() :96 0 0.0% 0.0% boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&) :118 23x 100.0% 100.0% boost::capy::detail::strand_service_impl::get_implementation() :123 27x 100.0% 100.0% boost::capy::detail::strand_service_impl::shutdown() :133 23x 100.0% 100.0% boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>) :149 332x 100.0% 82.0% boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&) :162 17x 100.0% 86.0% boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&) :173 17x 100.0% 100.0% boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&) :185 17x 100.0% 100.0% boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&) :191 12x 100.0% 100.0% boost::capy::detail::strand_service_impl::make_invoker(boost::capy::detail::strand_impl&) :199 12x 100.0% 50.0% boost::capy::detail::strand_service_impl::post_invoker(boost::capy::detail::strand_impl&, boost::capy::executor_ref) :215 12x 100.0% 100.0% boost::capy::detail::strand_service::strand_service() :228 23x 100.0% 100.0% boost::capy::detail::strand_service::~strand_service() :234 23x 100.0% 100.0% boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&) :238 10x 100.0% 100.0% boost::capy::detail::strand_service::dispatch(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :245 8x 100.0% 100.0% boost::capy::detail::strand_service::post(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :257 327x 100.0% 100.0% boost::capy::detail::get_strand_service(boost::capy::execution_context&) :265 27x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <atomic>
14 #include <coroutine>
15 #include <mutex>
16 #include <thread>
17 #include <utility>
18
19 namespace boost {
20 namespace capy {
21 namespace detail {
22
23 //----------------------------------------------------------
24
25 /** Implementation state for a strand.
26
27 Each strand_impl provides serialization for coroutines
28 dispatched through strands that share it.
29 */
30 // Sentinel stored in cached_frame_ after shutdown to prevent
31 // in-flight invokers from repopulating a freed cache slot.
32 inline void* const kCacheClosed = reinterpret_cast<void*>(1);
33
34 struct strand_impl
35 {
36 std::mutex mutex_;
37 strand_queue pending_;
38 bool locked_ = false;
39 std::atomic<std::thread::id> dispatch_thread_{};
40 std::atomic<void*> cached_frame_{nullptr};
41 };
42
43 //----------------------------------------------------------
44
45 /** Invoker coroutine for strand dispatch.
46
47 Uses custom allocator to recycle frame - one allocation
48 per strand_impl lifetime, stored in trailer for recovery.
49 */
50 struct strand_invoker
51 {
52 struct promise_type
53 {
54 // Used to post the invoker through the inner executor.
55 // Lives in the coroutine frame (heap-allocated), so has
56 // a stable address for the duration of the queue residency.
57 continuation self_;
58
59 12x void* operator new(std::size_t n, strand_impl& impl)
60 {
61 12x constexpr auto A = alignof(strand_impl*);
62 12x std::size_t padded = (n + A - 1) & ~(A - 1);
63 12x std::size_t total = padded + sizeof(strand_impl*);
64
65 12x void* p = impl.cached_frame_.exchange(
66 nullptr, std::memory_order_acquire);
67 12x if(!p || p == kCacheClosed)
68 11x p = ::operator new(total);
69
70 // Trailer lets delete recover impl
71 12x *reinterpret_cast<strand_impl**>(
72 12x static_cast<char*>(p) + padded) = &impl;
73 12x return p;
74 }
75
76 12x void operator delete(void* p, std::size_t n) noexcept
77 {
78 12x constexpr auto A = alignof(strand_impl*);
79 12x std::size_t padded = (n + A - 1) & ~(A - 1);
80
81 12x auto* impl = *reinterpret_cast<strand_impl**>(
82 static_cast<char*>(p) + padded);
83
84 12x void* expected = nullptr;
85 12x if(!impl->cached_frame_.compare_exchange_strong(
86 expected, p, std::memory_order_release))
87 ::operator delete(p);
88 12x }
89
90 12x strand_invoker get_return_object() noexcept
91 12x { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
92
93 12x std::suspend_always initial_suspend() noexcept { return {}; }
94 12x std::suspend_never final_suspend() noexcept { return {}; }
95 12x void return_void() noexcept {}
96 void unhandled_exception() { std::terminate(); }
97 };
98
99 std::coroutine_handle<promise_type> h_;
100 };
101
102 //----------------------------------------------------------
103
104 /** Concrete implementation of strand_service.
105
106 Holds the fixed pool of strand_impl objects.
107 */
108 class strand_service_impl : public strand_service
109 {
110 static constexpr std::size_t num_impls = 211;
111
112 strand_impl impls_[num_impls];
113 std::size_t salt_ = 0;
114 std::mutex mutex_;
115
116 public:
117 explicit
118 23x strand_service_impl(execution_context&)
119 4876x {
120 23x }
121
122 strand_impl*
123 27x get_implementation() override
124 {
125 27x std::lock_guard<std::mutex> lock(mutex_);
126 27x std::size_t index = salt_++;
127 27x index = index % num_impls;
128 27x return &impls_[index];
129 27x }
130
131 protected:
132 void
133 23x shutdown() override
134 {
135 4876x for(std::size_t i = 0; i < num_impls; ++i)
136 {
137 4853x std::lock_guard<std::mutex> lock(impls_[i].mutex_);
138 4853x impls_[i].locked_ = true;
139
140 4853x void* p = impls_[i].cached_frame_.exchange(
141 kCacheClosed, std::memory_order_acquire);
142 4853x if(p)
143 11x ::operator delete(p);
144 4853x }
145 23x }
146
147 private:
148 static bool
149 332x enqueue(strand_impl& impl, std::coroutine_handle<> h)
150 {
151 332x std::lock_guard<std::mutex> lock(impl.mutex_);
152 332x impl.pending_.push(h);
153 332x if(!impl.locked_)
154 {
155 12x impl.locked_ = true;
156 12x return true;
157 }
158 320x return false;
159 332x }
160
161 static void
162 17x dispatch_pending(strand_impl& impl)
163 {
164 17x strand_queue::taken_batch batch;
165 {
166 17x std::lock_guard<std::mutex> lock(impl.mutex_);
167 17x batch = impl.pending_.take_all();
168 17x }
169 17x impl.pending_.dispatch_batch(batch);
170 17x }
171
172 static bool
173 17x try_unlock(strand_impl& impl)
174 {
175 17x std::lock_guard<std::mutex> lock(impl.mutex_);
176 17x if(impl.pending_.empty())
177 {
178 12x impl.locked_ = false;
179 12x return true;
180 }
181 5x return false;
182 17x }
183
184 static void
185 17x set_dispatch_thread(strand_impl& impl) noexcept
186 {
187 17x impl.dispatch_thread_.store(std::this_thread::get_id());
188 17x }
189
190 static void
191 12x clear_dispatch_thread(strand_impl& impl) noexcept
192 {
193 12x impl.dispatch_thread_.store(std::thread::id{});
194 12x }
195
196 // Loops until queue empty (aggressive). Alternative: per-batch fairness
197 // (repost after each batch to let other work run) - explore if starvation observed.
198 static strand_invoker
199 12x make_invoker(strand_impl& impl)
200 {
201 strand_impl* p = &impl;
202 for(;;)
203 {
204 set_dispatch_thread(*p);
205 dispatch_pending(*p);
206 if(try_unlock(*p))
207 {
208 clear_dispatch_thread(*p);
209 co_return;
210 }
211 }
212 24x }
213
214 static void
215 12x post_invoker(strand_impl& impl, executor_ref ex)
216 {
217 12x auto invoker = make_invoker(impl);
218 12x auto& self = invoker.h_.promise().self_;
219 12x self.h = invoker.h_;
220 12x ex.post(self);
221 12x }
222
223 friend class strand_service;
224 };
225
226 //----------------------------------------------------------
227
228 23x strand_service::
229 23x strand_service()
230 23x : service()
231 {
232 23x }
233
234 23x strand_service::
235 ~strand_service() = default;
236
237 bool
238 10x strand_service::
239 running_in_this_thread(strand_impl& impl) noexcept
240 {
241 10x return impl.dispatch_thread_.load() == std::this_thread::get_id();
242 }
243
244 std::coroutine_handle<>
245 8x strand_service::
246 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
247 {
248 8x if(running_in_this_thread(impl))
249 3x return h;
250
251 5x if(strand_service_impl::enqueue(impl, h))
252 5x strand_service_impl::post_invoker(impl, ex);
253 5x return std::noop_coroutine();
254 }
255
256 void
257 327x strand_service::
258 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
259 {
260 327x if(strand_service_impl::enqueue(impl, h))
261 7x strand_service_impl::post_invoker(impl, ex);
262 327x }
263
264 strand_service&
265 27x get_strand_service(execution_context& ctx)
266 {
267 27x return ctx.use_service<strand_service_impl>();
268 }
269
270 } // namespace detail
271 } // namespace capy
272 } // namespace boost
273