src/ex/detail/strand_service.cpp
97.9% Lines (95/97)
95.7% List of functions (22/23)
Functions (23)
Function
Calls
Lines
Blocks
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, boost::capy::detail::strand_impl&)
:59
12x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long)
:76
12x
87.5%
75.0%
boost::capy::detail::strand_invoker::promise_type::get_return_object()
:90
12x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::initial_suspend()
:93
12x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::final_suspend()
:94
12x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::return_void()
:95
12x
100.0%
100.0%
boost::capy::detail::strand_invoker::promise_type::unhandled_exception()
:96
0
0.0%
0.0%
boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&)
:118
23x
100.0%
100.0%
boost::capy::detail::strand_service_impl::get_implementation()
:123
27x
100.0%
100.0%
boost::capy::detail::strand_service_impl::shutdown()
:133
23x
100.0%
100.0%
boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>)
:149
332x
100.0%
82.0%
boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&)
:162
17x
100.0%
86.0%
boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&)
:173
17x
100.0%
100.0%
boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&)
:185
17x
100.0%
100.0%
boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&)
:191
12x
100.0%
100.0%
boost::capy::detail::strand_service_impl::make_invoker(boost::capy::detail::strand_impl&)
:199
12x
100.0%
50.0%
boost::capy::detail::strand_service_impl::post_invoker(boost::capy::detail::strand_impl&, boost::capy::executor_ref)
:215
12x
100.0%
100.0%
boost::capy::detail::strand_service::strand_service()
:228
23x
100.0%
100.0%
boost::capy::detail::strand_service::~strand_service()
:234
23x
100.0%
100.0%
boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&)
:238
10x
100.0%
100.0%
boost::capy::detail::strand_service::dispatch(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>)
:245
8x
100.0%
100.0%
boost::capy::detail::strand_service::post(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>)
:257
327x
100.0%
100.0%
boost::capy::detail::get_strand_service(boost::capy::execution_context&)
:265
27x
100.0%
100.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/capy | ||
| 8 | // | ||
| 9 | |||
| 10 | #include "src/ex/detail/strand_queue.hpp" | ||
| 11 | #include <boost/capy/ex/detail/strand_service.hpp> | ||
| 12 | #include <boost/capy/continuation.hpp> | ||
| 13 | #include <atomic> | ||
| 14 | #include <coroutine> | ||
| 15 | #include <mutex> | ||
| 16 | #include <thread> | ||
| 17 | #include <utility> | ||
| 18 | |||
| 19 | namespace boost { | ||
| 20 | namespace capy { | ||
| 21 | namespace detail { | ||
| 22 | |||
| 23 | //---------------------------------------------------------- | ||
| 24 | |||
| 25 | /** Implementation state for a strand. | ||
| 26 | |||
| 27 | Each strand_impl provides serialization for coroutines | ||
| 28 | dispatched through strands that share it. | ||
| 29 | */ | ||
| 30 | // Sentinel stored in cached_frame_ after shutdown to prevent | ||
| 31 | // in-flight invokers from repopulating a freed cache slot. | ||
| 32 | inline void* const kCacheClosed = reinterpret_cast<void*>(1); | ||
| 33 | |||
| 34 | struct strand_impl | ||
| 35 | { | ||
| 36 | std::mutex mutex_; | ||
| 37 | strand_queue pending_; | ||
| 38 | bool locked_ = false; | ||
| 39 | std::atomic<std::thread::id> dispatch_thread_{}; | ||
| 40 | std::atomic<void*> cached_frame_{nullptr}; | ||
| 41 | }; | ||
| 42 | |||
| 43 | //---------------------------------------------------------- | ||
| 44 | |||
| 45 | /** Invoker coroutine for strand dispatch. | ||
| 46 | |||
| 47 | Uses custom allocator to recycle frame - one allocation | ||
| 48 | per strand_impl lifetime, stored in trailer for recovery. | ||
| 49 | */ | ||
| 50 | struct strand_invoker | ||
| 51 | { | ||
| 52 | struct promise_type | ||
| 53 | { | ||
| 54 | // Used to post the invoker through the inner executor. | ||
| 55 | // Lives in the coroutine frame (heap-allocated), so has | ||
| 56 | // a stable address for the duration of the queue residency. | ||
| 57 | continuation self_; | ||
| 58 | |||
| 59 | 12x | void* operator new(std::size_t n, strand_impl& impl) | |
| 60 | { | ||
| 61 | 12x | constexpr auto A = alignof(strand_impl*); | |
| 62 | 12x | std::size_t padded = (n + A - 1) & ~(A - 1); | |
| 63 | 12x | std::size_t total = padded + sizeof(strand_impl*); | |
| 64 | |||
| 65 | 12x | void* p = impl.cached_frame_.exchange( | |
| 66 | nullptr, std::memory_order_acquire); | ||
| 67 | 12x | if(!p || p == kCacheClosed) | |
| 68 | 11x | p = ::operator new(total); | |
| 69 | |||
| 70 | // Trailer lets delete recover impl | ||
| 71 | 12x | *reinterpret_cast<strand_impl**>( | |
| 72 | 12x | static_cast<char*>(p) + padded) = &impl; | |
| 73 | 12x | return p; | |
| 74 | } | ||
| 75 | |||
| 76 | 12x | void operator delete(void* p, std::size_t n) noexcept | |
| 77 | { | ||
| 78 | 12x | constexpr auto A = alignof(strand_impl*); | |
| 79 | 12x | std::size_t padded = (n + A - 1) & ~(A - 1); | |
| 80 | |||
| 81 | 12x | auto* impl = *reinterpret_cast<strand_impl**>( | |
| 82 | static_cast<char*>(p) + padded); | ||
| 83 | |||
| 84 | 12x | void* expected = nullptr; | |
| 85 | 12x | if(!impl->cached_frame_.compare_exchange_strong( | |
| 86 | expected, p, std::memory_order_release)) | ||
| 87 | ✗ | ::operator delete(p); | |
| 88 | 12x | } | |
| 89 | |||
| 90 | 12x | strand_invoker get_return_object() noexcept | |
| 91 | 12x | { return {std::coroutine_handle<promise_type>::from_promise(*this)}; } | |
| 92 | |||
| 93 | 12x | std::suspend_always initial_suspend() noexcept { return {}; } | |
| 94 | 12x | std::suspend_never final_suspend() noexcept { return {}; } | |
| 95 | 12x | void return_void() noexcept {} | |
| 96 | ✗ | void unhandled_exception() { std::terminate(); } | |
| 97 | }; | ||
| 98 | |||
| 99 | std::coroutine_handle<promise_type> h_; | ||
| 100 | }; | ||
| 101 | |||
| 102 | //---------------------------------------------------------- | ||
| 103 | |||
| 104 | /** Concrete implementation of strand_service. | ||
| 105 | |||
| 106 | Holds the fixed pool of strand_impl objects. | ||
| 107 | */ | ||
| 108 | class strand_service_impl : public strand_service | ||
| 109 | { | ||
| 110 | static constexpr std::size_t num_impls = 211; | ||
| 111 | |||
| 112 | strand_impl impls_[num_impls]; | ||
| 113 | std::size_t salt_ = 0; | ||
| 114 | std::mutex mutex_; | ||
| 115 | |||
| 116 | public: | ||
| 117 | explicit | ||
| 118 | 23x | strand_service_impl(execution_context&) | |
| 119 | 4876x | { | |
| 120 | 23x | } | |
| 121 | |||
| 122 | strand_impl* | ||
| 123 | 27x | get_implementation() override | |
| 124 | { | ||
| 125 | 27x | std::lock_guard<std::mutex> lock(mutex_); | |
| 126 | 27x | std::size_t index = salt_++; | |
| 127 | 27x | index = index % num_impls; | |
| 128 | 27x | return &impls_[index]; | |
| 129 | 27x | } | |
| 130 | |||
| 131 | protected: | ||
| 132 | void | ||
| 133 | 23x | shutdown() override | |
| 134 | { | ||
| 135 | 4876x | for(std::size_t i = 0; i < num_impls; ++i) | |
| 136 | { | ||
| 137 | 4853x | std::lock_guard<std::mutex> lock(impls_[i].mutex_); | |
| 138 | 4853x | impls_[i].locked_ = true; | |
| 139 | |||
| 140 | 4853x | void* p = impls_[i].cached_frame_.exchange( | |
| 141 | kCacheClosed, std::memory_order_acquire); | ||
| 142 | 4853x | if(p) | |
| 143 | 11x | ::operator delete(p); | |
| 144 | 4853x | } | |
| 145 | 23x | } | |
| 146 | |||
| 147 | private: | ||
| 148 | static bool | ||
| 149 | 332x | enqueue(strand_impl& impl, std::coroutine_handle<> h) | |
| 150 | { | ||
| 151 | 332x | std::lock_guard<std::mutex> lock(impl.mutex_); | |
| 152 | 332x | impl.pending_.push(h); | |
| 153 | 332x | if(!impl.locked_) | |
| 154 | { | ||
| 155 | 12x | impl.locked_ = true; | |
| 156 | 12x | return true; | |
| 157 | } | ||
| 158 | 320x | return false; | |
| 159 | 332x | } | |
| 160 | |||
| 161 | static void | ||
| 162 | 17x | dispatch_pending(strand_impl& impl) | |
| 163 | { | ||
| 164 | 17x | strand_queue::taken_batch batch; | |
| 165 | { | ||
| 166 | 17x | std::lock_guard<std::mutex> lock(impl.mutex_); | |
| 167 | 17x | batch = impl.pending_.take_all(); | |
| 168 | 17x | } | |
| 169 | 17x | impl.pending_.dispatch_batch(batch); | |
| 170 | 17x | } | |
| 171 | |||
| 172 | static bool | ||
| 173 | 17x | try_unlock(strand_impl& impl) | |
| 174 | { | ||
| 175 | 17x | std::lock_guard<std::mutex> lock(impl.mutex_); | |
| 176 | 17x | if(impl.pending_.empty()) | |
| 177 | { | ||
| 178 | 12x | impl.locked_ = false; | |
| 179 | 12x | return true; | |
| 180 | } | ||
| 181 | 5x | return false; | |
| 182 | 17x | } | |
| 183 | |||
| 184 | static void | ||
| 185 | 17x | set_dispatch_thread(strand_impl& impl) noexcept | |
| 186 | { | ||
| 187 | 17x | impl.dispatch_thread_.store(std::this_thread::get_id()); | |
| 188 | 17x | } | |
| 189 | |||
| 190 | static void | ||
| 191 | 12x | clear_dispatch_thread(strand_impl& impl) noexcept | |
| 192 | { | ||
| 193 | 12x | impl.dispatch_thread_.store(std::thread::id{}); | |
| 194 | 12x | } | |
| 195 | |||
| 196 | // Loops until queue empty (aggressive). Alternative: per-batch fairness | ||
| 197 | // (repost after each batch to let other work run) - explore if starvation observed. | ||
| 198 | static strand_invoker | ||
| 199 | 12x | make_invoker(strand_impl& impl) | |
| 200 | { | ||
| 201 | strand_impl* p = &impl; | ||
| 202 | for(;;) | ||
| 203 | { | ||
| 204 | set_dispatch_thread(*p); | ||
| 205 | dispatch_pending(*p); | ||
| 206 | if(try_unlock(*p)) | ||
| 207 | { | ||
| 208 | clear_dispatch_thread(*p); | ||
| 209 | co_return; | ||
| 210 | } | ||
| 211 | } | ||
| 212 | 24x | } | |
| 213 | |||
| 214 | static void | ||
| 215 | 12x | post_invoker(strand_impl& impl, executor_ref ex) | |
| 216 | { | ||
| 217 | 12x | auto invoker = make_invoker(impl); | |
| 218 | 12x | auto& self = invoker.h_.promise().self_; | |
| 219 | 12x | self.h = invoker.h_; | |
| 220 | 12x | ex.post(self); | |
| 221 | 12x | } | |
| 222 | |||
| 223 | friend class strand_service; | ||
| 224 | }; | ||
| 225 | |||
| 226 | //---------------------------------------------------------- | ||
| 227 | |||
| 228 | 23x | strand_service:: | |
| 229 | 23x | strand_service() | |
| 230 | 23x | : service() | |
| 231 | { | ||
| 232 | 23x | } | |
| 233 | |||
| 234 | 23x | strand_service:: | |
| 235 | ~strand_service() = default; | ||
| 236 | |||
| 237 | bool | ||
| 238 | 10x | strand_service:: | |
| 239 | running_in_this_thread(strand_impl& impl) noexcept | ||
| 240 | { | ||
| 241 | 10x | return impl.dispatch_thread_.load() == std::this_thread::get_id(); | |
| 242 | } | ||
| 243 | |||
| 244 | std::coroutine_handle<> | ||
| 245 | 8x | strand_service:: | |
| 246 | dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | ||
| 247 | { | ||
| 248 | 8x | if(running_in_this_thread(impl)) | |
| 249 | 3x | return h; | |
| 250 | |||
| 251 | 5x | if(strand_service_impl::enqueue(impl, h)) | |
| 252 | 5x | strand_service_impl::post_invoker(impl, ex); | |
| 253 | 5x | return std::noop_coroutine(); | |
| 254 | } | ||
| 255 | |||
| 256 | void | ||
| 257 | 327x | strand_service:: | |
| 258 | post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) | ||
| 259 | { | ||
| 260 | 327x | if(strand_service_impl::enqueue(impl, h)) | |
| 261 | 7x | strand_service_impl::post_invoker(impl, ex); | |
| 262 | 327x | } | |
| 263 | |||
| 264 | strand_service& | ||
| 265 | 27x | get_strand_service(execution_context& ctx) | |
| 266 | { | ||
| 267 | 27x | return ctx.use_service<strand_service_impl>(); | |
| 268 | } | ||
| 269 | |||
| 270 | } // namespace detail | ||
| 271 | } // namespace capy | ||
| 272 | } // namespace boost | ||
| 273 |