LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_service.cpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 97.9 % 97 95 2
Test Date: 2026-03-21 03:20:11 Functions: 91.7 % 24 22 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #include "src/ex/detail/strand_queue.hpp"
      11                 : #include <boost/capy/ex/detail/strand_service.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <atomic>
      14                 : #include <coroutine>
      15                 : #include <mutex>
      16                 : #include <thread>
      17                 : #include <utility>
      18                 : 
      19                 : namespace boost {
      20                 : namespace capy {
      21                 : namespace detail {
      22                 : 
      23                 : //----------------------------------------------------------
      24                 : 
      25                 : /** Implementation state for a strand.
      26                 : 
      27                 :     Each strand_impl provides serialization for coroutines
      28                 :     dispatched through strands that share it.
      29                 : */
      30                 : // Sentinel stored in cached_frame_ after shutdown to prevent
      31                 : // in-flight invokers from repopulating a freed cache slot.
      32                 : inline void* const kCacheClosed = reinterpret_cast<void*>(1);
      33                 : 
      34                 : struct strand_impl
      35                 : {
      36                 :     std::mutex mutex_;
      37                 :     strand_queue pending_;
      38                 :     bool locked_ = false;
      39                 :     std::atomic<std::thread::id> dispatch_thread_{};
      40                 :     std::atomic<void*> cached_frame_{nullptr};
      41                 : };
      42                 : 
      43                 : //----------------------------------------------------------
      44                 : 
      45                 : /** Invoker coroutine for strand dispatch.
      46                 : 
      47                 :     Uses custom allocator to recycle frame - one allocation
      48                 :     per strand_impl lifetime, stored in trailer for recovery.
      49                 : */
      50                 : struct strand_invoker
      51                 : {
      52                 :     struct promise_type
      53                 :     {
      54                 :         // Used to post the invoker through the inner executor.
      55                 :         // Lives in the coroutine frame (heap-allocated), so has
      56                 :         // a stable address for the duration of the queue residency.
      57                 :         continuation self_;
      58                 : 
      59 HIT          12 :         void* operator new(std::size_t n, strand_impl& impl)
      60                 :         {
      61              12 :             constexpr auto A = alignof(strand_impl*);
      62              12 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      63              12 :             std::size_t total = padded + sizeof(strand_impl*);
      64                 : 
      65              12 :             void* p = impl.cached_frame_.exchange(
      66                 :                 nullptr, std::memory_order_acquire);
      67              12 :             if(!p || p == kCacheClosed)
      68              11 :                 p = ::operator new(total);
      69                 : 
      70                 :             // Trailer lets delete recover impl
      71              12 :             *reinterpret_cast<strand_impl**>(
      72              12 :                 static_cast<char*>(p) + padded) = &impl;
      73              12 :             return p;
      74                 :         }
      75                 : 
      76              12 :         void operator delete(void* p, std::size_t n) noexcept
      77                 :         {
      78              12 :             constexpr auto A = alignof(strand_impl*);
      79              12 :             std::size_t padded = (n + A - 1) & ~(A - 1);
      80                 : 
      81              12 :             auto* impl = *reinterpret_cast<strand_impl**>(
      82                 :                 static_cast<char*>(p) + padded);
      83                 : 
      84              12 :             void* expected = nullptr;
      85              12 :             if(!impl->cached_frame_.compare_exchange_strong(
      86                 :                 expected, p, std::memory_order_release))
      87 MIS           0 :                 ::operator delete(p);
      88 HIT          12 :         }
      89                 : 
      90              12 :         strand_invoker get_return_object() noexcept
      91              12 :         { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
      92                 : 
      93              12 :         std::suspend_always initial_suspend() noexcept { return {}; }
      94              12 :         std::suspend_never final_suspend() noexcept { return {}; }
      95              12 :         void return_void() noexcept {}
      96 MIS           0 :         void unhandled_exception() { std::terminate(); }
      97                 :     };
      98                 : 
      99                 :     std::coroutine_handle<promise_type> h_;
     100                 : };
     101                 : 
     102                 : //----------------------------------------------------------
     103                 : 
     104                 : /** Concrete implementation of strand_service.
     105                 : 
     106                 :     Holds the fixed pool of strand_impl objects.
     107                 : */
     108                 : class strand_service_impl : public strand_service
     109                 : {
     110                 :     static constexpr std::size_t num_impls = 211;
     111                 : 
     112                 :     strand_impl impls_[num_impls];
     113                 :     std::size_t salt_ = 0;
     114                 :     std::mutex mutex_;
     115                 : 
     116                 : public:
     117                 :     explicit
     118 HIT          23 :     strand_service_impl(execution_context&)
     119            4876 :     {
     120              23 :     }
     121                 : 
     122                 :     strand_impl*
     123              27 :     get_implementation() override
     124                 :     {
     125              27 :         std::lock_guard<std::mutex> lock(mutex_);
     126              27 :         std::size_t index = salt_++;
     127              27 :         index = index % num_impls;
     128              27 :         return &impls_[index];
     129              27 :     }
     130                 : 
     131                 : protected:
     132                 :     void
     133              23 :     shutdown() override
     134                 :     {
     135            4876 :         for(std::size_t i = 0; i < num_impls; ++i)
     136                 :         {
     137            4853 :             std::lock_guard<std::mutex> lock(impls_[i].mutex_);
     138            4853 :             impls_[i].locked_ = true;
     139                 : 
     140            4853 :             void* p = impls_[i].cached_frame_.exchange(
     141                 :                 kCacheClosed, std::memory_order_acquire);
     142            4853 :             if(p)
     143              11 :                 ::operator delete(p);
     144            4853 :         }
     145              23 :     }
     146                 : 
     147                 : private:
     148                 :     static bool
     149             332 :     enqueue(strand_impl& impl, std::coroutine_handle<> h)
     150                 :     {
     151             332 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     152             332 :         impl.pending_.push(h);
     153             332 :         if(!impl.locked_)
     154                 :         {
     155              12 :             impl.locked_ = true;
     156              12 :             return true;
     157                 :         }
     158             320 :         return false;
     159             332 :     }
     160                 : 
     161                 :     static void
     162              17 :     dispatch_pending(strand_impl& impl)
     163                 :     {
     164              17 :         strand_queue::taken_batch batch;
     165                 :         {
     166              17 :             std::lock_guard<std::mutex> lock(impl.mutex_);
     167              17 :             batch = impl.pending_.take_all();
     168              17 :         }
     169              17 :         impl.pending_.dispatch_batch(batch);
     170              17 :     }
     171                 : 
     172                 :     static bool
     173              17 :     try_unlock(strand_impl& impl)
     174                 :     {
     175              17 :         std::lock_guard<std::mutex> lock(impl.mutex_);
     176              17 :         if(impl.pending_.empty())
     177                 :         {
     178              12 :             impl.locked_ = false;
     179              12 :             return true;
     180                 :         }
     181               5 :         return false;
     182              17 :     }
     183                 : 
     184                 :     static void
     185              17 :     set_dispatch_thread(strand_impl& impl) noexcept
     186                 :     {
     187              17 :         impl.dispatch_thread_.store(std::this_thread::get_id());
     188              17 :     }
     189                 : 
     190                 :     static void
     191              12 :     clear_dispatch_thread(strand_impl& impl) noexcept
     192                 :     {
     193              12 :         impl.dispatch_thread_.store(std::thread::id{});
     194              12 :     }
     195                 : 
     196                 :     // Loops until queue empty (aggressive). Alternative: per-batch fairness
     197                 :     // (repost after each batch to let other work run) - explore if starvation observed.
     198                 :     static strand_invoker
     199              12 :     make_invoker(strand_impl& impl)
     200                 :     {
     201                 :         strand_impl* p = &impl;
     202                 :         for(;;)
     203                 :         {
     204                 :             set_dispatch_thread(*p);
     205                 :             dispatch_pending(*p);
     206                 :             if(try_unlock(*p))
     207                 :             {
     208                 :                 clear_dispatch_thread(*p);
     209                 :                 co_return;
     210                 :             }
     211                 :         }
     212              24 :     }
     213                 : 
     214                 :     static void
     215              12 :     post_invoker(strand_impl& impl, executor_ref ex)
     216                 :     {
     217              12 :         auto invoker = make_invoker(impl);
     218              12 :         auto& self = invoker.h_.promise().self_;
     219              12 :         self.h = invoker.h_;
     220              12 :         ex.post(self);
     221              12 :     }
     222                 : 
     223                 :     friend class strand_service;
     224                 : };
     225                 : 
     226                 : //----------------------------------------------------------
     227                 : 
     228              23 : strand_service::
     229              23 : strand_service()
     230              23 :     : service()
     231                 : {
     232              23 : }
     233                 : 
     234              23 : strand_service::
     235                 : ~strand_service() = default;
     236                 : 
     237                 : bool
     238              10 : strand_service::
     239                 : running_in_this_thread(strand_impl& impl) noexcept
     240                 : {
     241              10 :     return impl.dispatch_thread_.load() == std::this_thread::get_id();
     242                 : }
     243                 : 
     244                 : std::coroutine_handle<>
     245               8 : strand_service::
     246                 : dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     247                 : {
     248               8 :     if(running_in_this_thread(impl))
     249               3 :         return h;
     250                 : 
     251               5 :     if(strand_service_impl::enqueue(impl, h))
     252               5 :         strand_service_impl::post_invoker(impl, ex);
     253               5 :     return std::noop_coroutine();
     254                 : }
     255                 : 
     256                 : void
     257             327 : strand_service::
     258                 : post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
     259                 : {
     260             327 :     if(strand_service_impl::enqueue(impl, h))
     261               7 :         strand_service_impl::post_invoker(impl, ex);
     262             327 : }
     263                 : 
     264                 : strand_service&
     265              27 : get_strand_service(execution_context& ctx)
     266                 : {
     267              27 :     return ctx.use_service<strand_service_impl>();
     268                 : }
     269                 : 
     270                 : } // namespace detail
     271                 : } // namespace capy
     272                 : } // namespace boost
        

Generated by: LCOV version 2.3