LCOV - code coverage report
Current view: top level - capy/ex - async_mutex.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 98.9 % 91 90 1
Test Date: 2026-03-21 03:20:11 Functions: 100.0 % 20 20

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
      11                 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/intrusive.hpp>
      15                 : #include <boost/capy/continuation.hpp>
      16                 : #include <boost/capy/concept/executor.hpp>
      17                 : #include <boost/capy/error.hpp>
      18                 : #include <boost/capy/ex/io_env.hpp>
      19                 : #include <boost/capy/io_result.hpp>
      20                 : 
      21                 : #include <stop_token>
      22                 : 
      23                 : #include <atomic>
      24                 : #include <coroutine>
      25                 : #include <new>
      26                 : #include <utility>
      27                 : 
      28                 : /*  async_mutex implementation notes
      29                 :     ================================
      30                 : 
      31                 :     Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
      32                 :     inherits intrusive_list<lock_awaiter>::node; the list is owned by
      33                 :     async_mutex::waiters_.
      34                 : 
      35                 :     Cancellation via stop_token
      36                 :     ---------------------------
      37                 :     A std::stop_callback is registered in await_suspend. Two actors can
      38                 :     race to resume the suspended coroutine: unlock() and the stop callback.
      39                 :     An atomic bool `claimed_` resolves the race -- whoever does
      40                 :     claimed_.exchange(true) and reads false wins. The loser does nothing.
      41                 : 
      42                 :     The stop callback calls ex_.post(h_). The stop_callback is
      43                 :     destroyed later in await_resume. cancel_fn touches no members
      44                 :     after post returns (same pattern as delete-this).
      45                 : 
      46                 :     unlock() pops waiters from the front. If the popped waiter was
      47                 :     already claimed by the stop callback, unlock() skips it and tries
      48                 :     the next. await_resume removes the (still-linked) canceled waiter
      49                 :     via waiters_.remove(this).
      50                 : 
      51                 :     The stop_callback lives in a union to suppress automatic
      52                 :     construction/destruction. Placement new in await_suspend, explicit
      53                 :     destructor call in await_resume and ~lock_awaiter.
      54                 : 
      55                 :     Member ordering constraint
      56                 :     --------------------------
      57                 :     The union containing stop_cb_ must be declared AFTER the members
      58                 :     the callback accesses (h_, ex_, claimed_, canceled_). If the
      59                 :     stop_cb_ destructor blocks waiting for a concurrent callback, those
      60                 :     members must still be alive (C++ destroys in reverse declaration
      61                 :     order).
      62                 : 
      63                 :     active_ flag
      64                 :     ------------
      65                 :     Tracks both list membership and stop_cb_ lifetime (they are always
      66                 :     set and cleared together). Used by the destructor to clean up if the
      67                 :     coroutine is destroyed while suspended (e.g. execution_context
      68                 :     shutdown).
      69                 : 
      70                 :     Cancellation scope
      71                 :     ------------------
      72                 :     Cancellation only takes effect while the coroutine is suspended in
      73                 :     the wait queue. If the mutex is unlocked, await_ready acquires it
      74                 :     immediately without checking the stop token. This is intentional:
      75                 :     the fast path has no token access and no overhead.
      76                 : 
      77                 :     Threading assumptions
      78                 :     ---------------------
      79                 :     - All list mutations happen on the executor thread (await_suspend,
      80                 :       await_resume, unlock, ~lock_awaiter).
      81                 :     - The stop callback may fire from any thread, but only touches
      82                 :       claimed_ (atomic) and then calls post. It never touches the
      83                 :       list.
      84                 :     - ~lock_awaiter must be called from the executor thread. This is
      85                 :       guaranteed during normal shutdown but NOT if the coroutine frame
      86                 :       is destroyed from another thread while a stop callback could
      87                 :       fire (precondition violation, same as cppcoro/folly).
      88                 : */
      89                 : 
      90                 : namespace boost {
      91                 : namespace capy {
      92                 : 
      93                 : /** An asynchronous mutex for coroutines.
      94                 : 
      95                 :     This mutex provides mutual exclusion for coroutines without blocking.
      96                 :     When a coroutine attempts to acquire a locked mutex, it suspends and
      97                 :     is added to an intrusive wait queue. When the holder unlocks, the next
      98                 :     waiter is resumed with the lock held.
      99                 : 
     100                 :     @par Cancellation
     101                 : 
     102                 :     When a coroutine is suspended waiting for the mutex and its stop
     103                 :     token is triggered, the waiter completes with `error::canceled`
     104                 :     instead of acquiring the lock.
     105                 : 
     106                 :     Cancellation only applies while the coroutine is suspended in the
     107                 :     wait queue. If the mutex is unlocked when `lock()` is called, the
     108                 :     lock is acquired immediately even if the stop token is already
     109                 :     signaled.
     110                 : 
     111                 :     @par Zero Allocation
     112                 : 
     113                 :     No heap allocation occurs for lock operations.
     114                 : 
     115                 :     @par Thread Safety
     116                 : 
     117                 :     Distinct objects: Safe.@n
     118                 :     Shared objects: Unsafe.
     119                 : 
     120                 :     The mutex operations are designed for single-threaded use on one
     121                 :     executor. The stop callback may fire from any thread.
     122                 : 
     123                 :     This type is non-copyable and non-movable because suspended
     124                 :     waiters hold intrusive pointers into the mutex's internal list.
     125                 : 
     126                 :     @par Example
     127                 :     @code
     128                 :     async_mutex cm;
     129                 : 
     130                 :     task<> protected_operation() {
     131                 :         auto [ec] = co_await cm.lock();
     132                 :         if(ec)
     133                 :             co_return;
     134                 :         // ... critical section ...
     135                 :         cm.unlock();
     136                 :     }
     137                 : 
     138                 :     // Or with RAII:
     139                 :     task<> protected_operation() {
     140                 :         auto [ec, guard] = co_await cm.scoped_lock();
     141                 :         if(ec)
     142                 :             co_return;
     143                 :         // ... critical section ...
     144                 :         // unlocks automatically
     145                 :     }
     146                 :     @endcode
     147                 : */
     148                 : class async_mutex
     149                 : {
     150                 : public:
     151                 :     class lock_awaiter;
     152                 :     class lock_guard;
     153                 :     class lock_guard_awaiter;
     154                 : 
     155                 : private:
     156                 :     bool locked_ = false;
     157                 :     detail::intrusive_list<lock_awaiter> waiters_;
     158                 : 
     159                 : public:
     160                 :     /** Awaiter returned by lock().
     161                 :     */
     162                 :     class lock_awaiter
     163                 :         : public detail::intrusive_list<lock_awaiter>::node
     164                 :     {
     165                 :         friend class async_mutex;
     166                 : 
     167                 :         async_mutex* m_;
     168                 :         continuation cont_;
     169                 :         executor_ref ex_;
     170                 : 
     171                 :         // These members must be declared before stop_cb_
     172                 :         // (see comment on the union below).
     173                 :         std::atomic<bool> claimed_{false};
     174                 :         bool canceled_ = false;
     175                 :         bool active_ = false;
     176                 : 
     177                 :         struct cancel_fn
     178                 :         {
     179                 :             lock_awaiter* self_;
     180                 : 
     181 HIT           6 :             void operator()() const noexcept
     182                 :             {
     183               6 :                 if(!self_->claimed_.exchange(
     184                 :                     true, std::memory_order_acq_rel))
     185                 :                 {
     186               6 :                     self_->canceled_ = true;
     187               6 :                     self_->ex_.post(self_->cont_);
     188                 :                 }
     189               6 :             }
     190                 :         };
     191                 : 
     192                 :         using stop_cb_t =
     193                 :             std::stop_callback<cancel_fn>;
     194                 : 
     195                 :         // Aligned storage for stop_cb_t. Declared last:
     196                 :         // its destructor may block while the callback
     197                 :         // accesses the members above.
     198                 :         BOOST_CAPY_MSVC_WARNING_PUSH
     199                 :         BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
     200                 :         alignas(stop_cb_t)
     201                 :             unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
     202                 :         BOOST_CAPY_MSVC_WARNING_POP
     203                 : 
     204              17 :         stop_cb_t& stop_cb_() noexcept
     205                 :         {
     206                 :             return *reinterpret_cast<stop_cb_t*>(
     207              17 :                 stop_cb_buf_);
     208                 :         }
     209                 : 
     210                 :     public:
     211              70 :         ~lock_awaiter()
     212                 :         {
     213              70 :             if(active_)
     214                 :             {
     215               3 :                 stop_cb_().~stop_cb_t();
     216               3 :                 m_->waiters_.remove(this);
     217                 :             }
     218              70 :         }
     219                 : 
     220              35 :         explicit lock_awaiter(async_mutex* m) noexcept
     221              35 :             : m_(m)
     222                 :         {
     223              35 :         }
     224                 : 
     225              35 :         lock_awaiter(lock_awaiter&& o) noexcept
     226              35 :             : m_(o.m_)
     227              35 :             , cont_(o.cont_)
     228              35 :             , ex_(o.ex_)
     229              35 :             , claimed_(o.claimed_.load(
     230                 :                 std::memory_order_relaxed))
     231              35 :             , canceled_(o.canceled_)
     232              35 :             , active_(std::exchange(o.active_, false))
     233                 :         {
     234              35 :         }
     235                 : 
     236                 :         lock_awaiter(lock_awaiter const&) = delete;
     237                 :         lock_awaiter& operator=(lock_awaiter const&) = delete;
     238                 :         lock_awaiter& operator=(lock_awaiter&&) = delete;
     239                 : 
     240              35 :         bool await_ready() const noexcept
     241                 :         {
     242              35 :             if(!m_->locked_)
     243                 :             {
     244              16 :                 m_->locked_ = true;
     245              16 :                 return true;
     246                 :             }
     247              19 :             return false;
     248                 :         }
     249                 : 
     250                 :         /** IoAwaitable protocol overload. */
     251                 :         std::coroutine_handle<>
     252              19 :         await_suspend(
     253                 :             std::coroutine_handle<> h,
     254                 :             io_env const* env) noexcept
     255                 :         {
     256              19 :             if(env->stop_token.stop_requested())
     257                 :             {
     258               2 :                 canceled_ = true;
     259               2 :                 return h;
     260                 :             }
     261              17 :             cont_.h = h;
     262              17 :             ex_ = env->executor;
     263              17 :             m_->waiters_.push_back(this);
     264              51 :             ::new(stop_cb_buf_) stop_cb_t(
     265              17 :                 env->stop_token, cancel_fn{this});
     266              17 :             active_ = true;
     267              17 :             return std::noop_coroutine();
     268                 :         }
     269                 : 
     270              32 :         io_result<> await_resume() noexcept
     271                 :         {
     272              32 :             if(active_)
     273                 :             {
     274              14 :                 stop_cb_().~stop_cb_t();
     275              14 :                 if(canceled_)
     276                 :                 {
     277               6 :                     m_->waiters_.remove(this);
     278               6 :                     active_ = false;
     279                 :                     return {make_error_code(
     280               6 :                         error::canceled)};
     281                 :                 }
     282               8 :                 active_ = false;
     283                 :             }
     284              26 :             if(canceled_)
     285                 :                 return {make_error_code(
     286               2 :                     error::canceled)};
     287              24 :             return {{}};
     288                 :         }
     289                 :     };
     290                 : 
     291                 :     /** RAII lock guard for async_mutex.
     292                 : 
     293                 :         Automatically unlocks the mutex when destroyed.
     294                 :     */
     295                 :     class [[nodiscard]] lock_guard
     296                 :     {
     297                 :         async_mutex* m_;
     298                 : 
     299                 :     public:
     300               9 :         ~lock_guard()
     301                 :         {
     302               9 :             if(m_)
     303               2 :                 m_->unlock();
     304               9 :         }
     305                 : 
     306               2 :         lock_guard() noexcept
     307               2 :             : m_(nullptr)
     308                 :         {
     309               2 :         }
     310                 : 
     311               2 :         explicit lock_guard(async_mutex* m) noexcept
     312               2 :             : m_(m)
     313                 :         {
     314               2 :         }
     315                 : 
     316               5 :         lock_guard(lock_guard&& o) noexcept
     317               5 :             : m_(std::exchange(o.m_, nullptr))
     318                 :         {
     319               5 :         }
     320                 : 
     321                 :         lock_guard& operator=(lock_guard&& o) noexcept
     322                 :         {
     323                 :             if(this != &o)
     324                 :             {
     325                 :                 if(m_)
     326                 :                     m_->unlock();
     327                 :                 m_ = std::exchange(o.m_, nullptr);
     328                 :             }
     329                 :             return *this;
     330                 :         }
     331                 : 
     332                 :         lock_guard(lock_guard const&) = delete;
     333                 :         lock_guard& operator=(lock_guard const&) = delete;
     334                 :     };
     335                 : 
     336                 :     /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
     337                 :     */
     338                 :     class lock_guard_awaiter
     339                 :     {
     340                 :         async_mutex* m_;
     341                 :         lock_awaiter inner_;
     342                 : 
     343                 :     public:
     344               4 :         explicit lock_guard_awaiter(async_mutex* m) noexcept
     345               4 :             : m_(m)
     346               4 :             , inner_(m)
     347                 :         {
     348               4 :         }
     349                 : 
     350               4 :         bool await_ready() const noexcept
     351                 :         {
     352               4 :             return inner_.await_ready();
     353                 :         }
     354                 : 
     355                 :         /** IoAwaitable protocol overload. */
     356                 :         std::coroutine_handle<>
     357               2 :         await_suspend(
     358                 :             std::coroutine_handle<> h,
     359                 :             io_env const* env) noexcept
     360                 :         {
     361               2 :             return inner_.await_suspend(h, env);
     362                 :         }
     363                 : 
     364               4 :         io_result<lock_guard> await_resume() noexcept
     365                 :         {
     366               4 :             auto r = inner_.await_resume();
     367               4 :             if(r.ec)
     368               2 :                 return {r.ec, {}};
     369               2 :             return {{}, lock_guard(m_)};
     370                 :         }
     371                 :     };
     372                 : 
     373                 :     /// Construct an unlocked mutex.
     374                 :     async_mutex() = default;
     375                 : 
     376                 :     /// Copy constructor (deleted).
     377                 :     async_mutex(async_mutex const&) = delete;
     378                 : 
     379                 :     /// Copy assignment (deleted).
     380                 :     async_mutex& operator=(async_mutex const&) = delete;
     381                 : 
     382                 :     /// Move constructor (deleted).
     383                 :     async_mutex(async_mutex&&) = delete;
     384                 : 
     385                 :     /// Move assignment (deleted).
     386                 :     async_mutex& operator=(async_mutex&&) = delete;
     387                 : 
     388                 :     /** Returns an awaiter that acquires the mutex.
     389                 : 
     390                 :         @return An awaitable that await-returns `(error_code)`.
     391                 :     */
     392              31 :     lock_awaiter lock() noexcept
     393                 :     {
     394              31 :         return lock_awaiter{this};
     395                 :     }
     396                 : 
     397                 :     /** Returns an awaiter that acquires the mutex with RAII.
     398                 : 
     399                 :         @return An awaitable that await-returns `(error_code,lock_guard)`.
     400                 :     */
     401               4 :     lock_guard_awaiter scoped_lock() noexcept
     402                 :     {
     403               4 :         return lock_guard_awaiter(this);
     404                 :     }
     405                 : 
     406                 :     /** Releases the mutex.
     407                 : 
     408                 :         If waiters are queued, the next eligible waiter is
     409                 :         resumed with the lock held. Canceled waiters are
     410                 :         skipped. If no eligible waiter remains, the mutex
     411                 :         becomes unlocked.
     412                 :     */
     413              24 :     void unlock() noexcept
     414                 :     {
     415                 :         for(;;)
     416                 :         {
     417              24 :             auto* waiter = waiters_.pop_front();
     418              24 :             if(!waiter)
     419                 :             {
     420              16 :                 locked_ = false;
     421              16 :                 return;
     422                 :             }
     423               8 :             if(!waiter->claimed_.exchange(
     424                 :                 true, std::memory_order_acq_rel))
     425                 :             {
     426               8 :                 waiter->ex_.post(waiter->cont_);
     427               8 :                 return;
     428                 :             }
     429 MIS           0 :         }
     430                 :     }
     431                 : 
     432                 :     /** Returns true if the mutex is currently locked.
     433                 :     */
     434 HIT          26 :     bool is_locked() const noexcept
     435                 :     {
     436              26 :         return locked_;
     437                 :     }
     438                 : };
     439                 : 
     440                 : } // namespace capy
     441                 : } // namespace boost
     442                 : 
     443                 : #endif
        

Generated by: LCOV version 2.3