TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #include "src/ex/detail/strand_queue.hpp"
11 : #include <boost/capy/ex/detail/strand_service.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <atomic>
14 : #include <coroutine>
15 : #include <mutex>
16 : #include <thread>
17 : #include <utility>
18 :
19 : namespace boost {
20 : namespace capy {
21 : namespace detail {
22 :
23 : //----------------------------------------------------------
24 :
25 : /** Implementation state for a strand.
26 :
27 : Each strand_impl provides serialization for coroutines
28 : dispatched through strands that share it.
29 : */
30 : // Sentinel stored in cached_frame_ after shutdown to prevent
31 : // in-flight invokers from repopulating a freed cache slot.
32 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
33 :
34 : struct strand_impl
35 : {
36 : std::mutex mutex_;
37 : strand_queue pending_;
38 : bool locked_ = false;
39 : std::atomic<std::thread::id> dispatch_thread_{};
40 : std::atomic<void*> cached_frame_{nullptr};
41 : };
42 :
43 : //----------------------------------------------------------
44 :
45 : /** Invoker coroutine for strand dispatch.
46 :
47 : Uses custom allocator to recycle frame - one allocation
48 : per strand_impl lifetime, stored in trailer for recovery.
49 : */
50 : struct strand_invoker
51 : {
52 : struct promise_type
53 : {
54 : // Used to post the invoker through the inner executor.
55 : // Lives in the coroutine frame (heap-allocated), so has
56 : // a stable address for the duration of the queue residency.
57 : continuation self_;
58 :
59 HIT 12 : void* operator new(std::size_t n, strand_impl& impl)
60 : {
61 12 : constexpr auto A = alignof(strand_impl*);
62 12 : std::size_t padded = (n + A - 1) & ~(A - 1);
63 12 : std::size_t total = padded + sizeof(strand_impl*);
64 :
65 12 : void* p = impl.cached_frame_.exchange(
66 : nullptr, std::memory_order_acquire);
67 12 : if(!p || p == kCacheClosed)
68 11 : p = ::operator new(total);
69 :
70 : // Trailer lets delete recover impl
71 12 : *reinterpret_cast<strand_impl**>(
72 12 : static_cast<char*>(p) + padded) = &impl;
73 12 : return p;
74 : }
75 :
76 12 : void operator delete(void* p, std::size_t n) noexcept
77 : {
78 12 : constexpr auto A = alignof(strand_impl*);
79 12 : std::size_t padded = (n + A - 1) & ~(A - 1);
80 :
81 12 : auto* impl = *reinterpret_cast<strand_impl**>(
82 : static_cast<char*>(p) + padded);
83 :
84 12 : void* expected = nullptr;
85 12 : if(!impl->cached_frame_.compare_exchange_strong(
86 : expected, p, std::memory_order_release))
87 MIS 0 : ::operator delete(p);
88 HIT 12 : }
89 :
90 12 : strand_invoker get_return_object() noexcept
91 12 : { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
92 :
93 12 : std::suspend_always initial_suspend() noexcept { return {}; }
94 12 : std::suspend_never final_suspend() noexcept { return {}; }
95 12 : void return_void() noexcept {}
96 MIS 0 : void unhandled_exception() { std::terminate(); }
97 : };
98 :
99 : std::coroutine_handle<promise_type> h_;
100 : };
101 :
102 : //----------------------------------------------------------
103 :
104 : /** Concrete implementation of strand_service.
105 :
106 : Holds the fixed pool of strand_impl objects.
107 : */
108 : class strand_service_impl : public strand_service
109 : {
110 : static constexpr std::size_t num_impls = 211;
111 :
112 : strand_impl impls_[num_impls];
113 : std::size_t salt_ = 0;
114 : std::mutex mutex_;
115 :
116 : public:
117 : explicit
118 HIT 23 : strand_service_impl(execution_context&)
119 4876 : {
120 23 : }
121 :
122 : strand_impl*
123 27 : get_implementation() override
124 : {
125 27 : std::lock_guard<std::mutex> lock(mutex_);
126 27 : std::size_t index = salt_++;
127 27 : index = index % num_impls;
128 27 : return &impls_[index];
129 27 : }
130 :
131 : protected:
132 : void
133 23 : shutdown() override
134 : {
135 4876 : for(std::size_t i = 0; i < num_impls; ++i)
136 : {
137 4853 : std::lock_guard<std::mutex> lock(impls_[i].mutex_);
138 4853 : impls_[i].locked_ = true;
139 :
140 4853 : void* p = impls_[i].cached_frame_.exchange(
141 : kCacheClosed, std::memory_order_acquire);
142 4853 : if(p)
143 11 : ::operator delete(p);
144 4853 : }
145 23 : }
146 :
147 : private:
148 : static bool
149 332 : enqueue(strand_impl& impl, std::coroutine_handle<> h)
150 : {
151 332 : std::lock_guard<std::mutex> lock(impl.mutex_);
152 332 : impl.pending_.push(h);
153 332 : if(!impl.locked_)
154 : {
155 12 : impl.locked_ = true;
156 12 : return true;
157 : }
158 320 : return false;
159 332 : }
160 :
161 : static void
162 17 : dispatch_pending(strand_impl& impl)
163 : {
164 17 : strand_queue::taken_batch batch;
165 : {
166 17 : std::lock_guard<std::mutex> lock(impl.mutex_);
167 17 : batch = impl.pending_.take_all();
168 17 : }
169 17 : impl.pending_.dispatch_batch(batch);
170 17 : }
171 :
172 : static bool
173 17 : try_unlock(strand_impl& impl)
174 : {
175 17 : std::lock_guard<std::mutex> lock(impl.mutex_);
176 17 : if(impl.pending_.empty())
177 : {
178 12 : impl.locked_ = false;
179 12 : return true;
180 : }
181 5 : return false;
182 17 : }
183 :
184 : static void
185 17 : set_dispatch_thread(strand_impl& impl) noexcept
186 : {
187 17 : impl.dispatch_thread_.store(std::this_thread::get_id());
188 17 : }
189 :
190 : static void
191 12 : clear_dispatch_thread(strand_impl& impl) noexcept
192 : {
193 12 : impl.dispatch_thread_.store(std::thread::id{});
194 12 : }
195 :
196 : // Loops until queue empty (aggressive). Alternative: per-batch fairness
197 : // (repost after each batch to let other work run) - explore if starvation observed.
198 : static strand_invoker
199 12 : make_invoker(strand_impl& impl)
200 : {
201 : strand_impl* p = &impl;
202 : for(;;)
203 : {
204 : set_dispatch_thread(*p);
205 : dispatch_pending(*p);
206 : if(try_unlock(*p))
207 : {
208 : clear_dispatch_thread(*p);
209 : co_return;
210 : }
211 : }
212 24 : }
213 :
214 : static void
215 12 : post_invoker(strand_impl& impl, executor_ref ex)
216 : {
217 12 : auto invoker = make_invoker(impl);
218 12 : auto& self = invoker.h_.promise().self_;
219 12 : self.h = invoker.h_;
220 12 : ex.post(self);
221 12 : }
222 :
223 : friend class strand_service;
224 : };
225 :
226 : //----------------------------------------------------------
227 :
228 23 : strand_service::
229 23 : strand_service()
230 23 : : service()
231 : {
232 23 : }
233 :
234 23 : strand_service::
235 : ~strand_service() = default;
236 :
237 : bool
238 10 : strand_service::
239 : running_in_this_thread(strand_impl& impl) noexcept
240 : {
241 10 : return impl.dispatch_thread_.load() == std::this_thread::get_id();
242 : }
243 :
244 : std::coroutine_handle<>
245 8 : strand_service::
246 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
247 : {
248 8 : if(running_in_this_thread(impl))
249 3 : return h;
250 :
251 5 : if(strand_service_impl::enqueue(impl, h))
252 5 : strand_service_impl::post_invoker(impl, ex);
253 5 : return std::noop_coroutine();
254 : }
255 :
256 : void
257 327 : strand_service::
258 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
259 : {
260 327 : if(strand_service_impl::enqueue(impl, h))
261 7 : strand_service_impl::post_invoker(impl, ex);
262 327 : }
263 :
264 : strand_service&
265 27 : get_strand_service(execution_context& ctx)
266 : {
267 27 : return ctx.use_service<strand_service_impl>();
268 : }
269 :
270 : } // namespace detail
271 : } // namespace capy
272 : } // namespace boost
|