include/boost/capy/ex/async_mutex.hpp

98.9% Lines (90/91) 100.0% List of functions (20/20)
async_mutex.hpp
f(x) Functions (20)
Function Calls Lines Blocks
boost::capy::async_mutex::lock_awaiter::cancel_fn::operator()() const :181 6x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::stop_cb_() :204 17x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::~lock_awaiter() :211 70x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::lock_awaiter(boost::capy::async_mutex*) :220 35x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::lock_awaiter(boost::capy::async_mutex::lock_awaiter&&) :225 35x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::await_ready() const :240 35x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :252 19x 100.0% 100.0% boost::capy::async_mutex::lock_awaiter::await_resume() :270 32x 100.0% 100.0% boost::capy::async_mutex::lock_guard::~lock_guard() :300 9x 100.0% 100.0% boost::capy::async_mutex::lock_guard::lock_guard() :306 2x 100.0% 100.0% boost::capy::async_mutex::lock_guard::lock_guard(boost::capy::async_mutex*) :311 2x 100.0% 100.0% boost::capy::async_mutex::lock_guard::lock_guard(boost::capy::async_mutex::lock_guard&&) :316 5x 100.0% 100.0% boost::capy::async_mutex::lock_guard_awaiter::lock_guard_awaiter(boost::capy::async_mutex*) :344 4x 100.0% 100.0% boost::capy::async_mutex::lock_guard_awaiter::await_ready() const :350 4x 100.0% 100.0% boost::capy::async_mutex::lock_guard_awaiter::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :357 2x 100.0% 100.0% boost::capy::async_mutex::lock_guard_awaiter::await_resume() :364 4x 100.0% 100.0% boost::capy::async_mutex::lock() :392 31x 100.0% 100.0% boost::capy::async_mutex::scoped_lock() :401 4x 100.0% 100.0% boost::capy::async_mutex::unlock() :413 24x 88.9% 90.0% boost::capy::async_mutex::is_locked() const :434 26x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/continuation.hpp>
16 #include <boost/capy/concept/executor.hpp>
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/io_env.hpp>
19 #include <boost/capy/io_result.hpp>
20
21 #include <stop_token>
22
23 #include <atomic>
24 #include <coroutine>
25 #include <new>
26 #include <utility>
27
28 /* async_mutex implementation notes
29 ================================
30
31 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
32 inherits intrusive_list<lock_awaiter>::node; the list is owned by
33 async_mutex::waiters_.
34
35 Cancellation via stop_token
36 ---------------------------
37 A std::stop_callback is registered in await_suspend. Two actors can
38 race to resume the suspended coroutine: unlock() and the stop callback.
39 An atomic bool `claimed_` resolves the race -- whoever does
40 claimed_.exchange(true) and reads false wins. The loser does nothing.
41
42 The stop callback calls ex_.post(h_). The stop_callback is
43 destroyed later in await_resume. cancel_fn touches no members
44 after post returns (same pattern as delete-this).
45
46 unlock() pops waiters from the front. If the popped waiter was
47 already claimed by the stop callback, unlock() skips it and tries
48 the next. await_resume removes the (still-linked) canceled waiter
49 via waiters_.remove(this).
50
51 The stop_callback lives in a union to suppress automatic
52 construction/destruction. Placement new in await_suspend, explicit
53 destructor call in await_resume and ~lock_awaiter.
54
55 Member ordering constraint
56 --------------------------
57 The union containing stop_cb_ must be declared AFTER the members
58 the callback accesses (h_, ex_, claimed_, canceled_). If the
59 stop_cb_ destructor blocks waiting for a concurrent callback, those
60 members must still be alive (C++ destroys in reverse declaration
61 order).
62
63 active_ flag
64 ------------
65 Tracks both list membership and stop_cb_ lifetime (they are always
66 set and cleared together). Used by the destructor to clean up if the
67 coroutine is destroyed while suspended (e.g. execution_context
68 shutdown).
69
70 Cancellation scope
71 ------------------
72 Cancellation only takes effect while the coroutine is suspended in
73 the wait queue. If the mutex is unlocked, await_ready acquires it
74 immediately without checking the stop token. This is intentional:
75 the fast path has no token access and no overhead.
76
77 Threading assumptions
78 ---------------------
79 - All list mutations happen on the executor thread (await_suspend,
80 await_resume, unlock, ~lock_awaiter).
81 - The stop callback may fire from any thread, but only touches
82 claimed_ (atomic) and then calls post. It never touches the
83 list.
84 - ~lock_awaiter must be called from the executor thread. This is
85 guaranteed during normal shutdown but NOT if the coroutine frame
86 is destroyed from another thread while a stop callback could
87 fire (precondition violation, same as cppcoro/folly).
88 */
89
90 namespace boost {
91 namespace capy {
92
93 /** An asynchronous mutex for coroutines.
94
95 This mutex provides mutual exclusion for coroutines without blocking.
96 When a coroutine attempts to acquire a locked mutex, it suspends and
97 is added to an intrusive wait queue. When the holder unlocks, the next
98 waiter is resumed with the lock held.
99
100 @par Cancellation
101
102 When a coroutine is suspended waiting for the mutex and its stop
103 token is triggered, the waiter completes with `error::canceled`
104 instead of acquiring the lock.
105
106 Cancellation only applies while the coroutine is suspended in the
107 wait queue. If the mutex is unlocked when `lock()` is called, the
108 lock is acquired immediately even if the stop token is already
109 signaled.
110
111 @par Zero Allocation
112
113 No heap allocation occurs for lock operations.
114
115 @par Thread Safety
116
117 Distinct objects: Safe.@n
118 Shared objects: Unsafe.
119
120 The mutex operations are designed for single-threaded use on one
121 executor. The stop callback may fire from any thread.
122
123 This type is non-copyable and non-movable because suspended
124 waiters hold intrusive pointers into the mutex's internal list.
125
126 @par Example
127 @code
128 async_mutex cm;
129
130 task<> protected_operation() {
131 auto [ec] = co_await cm.lock();
132 if(ec)
133 co_return;
134 // ... critical section ...
135 cm.unlock();
136 }
137
138 // Or with RAII:
139 task<> protected_operation() {
140 auto [ec, guard] = co_await cm.scoped_lock();
141 if(ec)
142 co_return;
143 // ... critical section ...
144 // unlocks automatically
145 }
146 @endcode
147 */
148 class async_mutex
149 {
150 public:
151 class lock_awaiter;
152 class lock_guard;
153 class lock_guard_awaiter;
154
155 private:
156 bool locked_ = false;
157 detail::intrusive_list<lock_awaiter> waiters_;
158
159 public:
160 /** Awaiter returned by lock().
161 */
162 class lock_awaiter
163 : public detail::intrusive_list<lock_awaiter>::node
164 {
165 friend class async_mutex;
166
167 async_mutex* m_;
168 continuation cont_;
169 executor_ref ex_;
170
171 // These members must be declared before stop_cb_
172 // (see comment on the union below).
173 std::atomic<bool> claimed_{false};
174 bool canceled_ = false;
175 bool active_ = false;
176
177 struct cancel_fn
178 {
179 lock_awaiter* self_;
180
181 6x void operator()() const noexcept
182 {
183 6x if(!self_->claimed_.exchange(
184 true, std::memory_order_acq_rel))
185 {
186 6x self_->canceled_ = true;
187 6x self_->ex_.post(self_->cont_);
188 }
189 6x }
190 };
191
192 using stop_cb_t =
193 std::stop_callback<cancel_fn>;
194
195 // Aligned storage for stop_cb_t. Declared last:
196 // its destructor may block while the callback
197 // accesses the members above.
198 BOOST_CAPY_MSVC_WARNING_PUSH
199 BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
200 alignas(stop_cb_t)
201 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
202 BOOST_CAPY_MSVC_WARNING_POP
203
204 17x stop_cb_t& stop_cb_() noexcept
205 {
206 return *reinterpret_cast<stop_cb_t*>(
207 17x stop_cb_buf_);
208 }
209
210 public:
211 70x ~lock_awaiter()
212 {
213 70x if(active_)
214 {
215 3x stop_cb_().~stop_cb_t();
216 3x m_->waiters_.remove(this);
217 }
218 70x }
219
220 35x explicit lock_awaiter(async_mutex* m) noexcept
221 35x : m_(m)
222 {
223 35x }
224
225 35x lock_awaiter(lock_awaiter&& o) noexcept
226 35x : m_(o.m_)
227 35x , cont_(o.cont_)
228 35x , ex_(o.ex_)
229 35x , claimed_(o.claimed_.load(
230 std::memory_order_relaxed))
231 35x , canceled_(o.canceled_)
232 35x , active_(std::exchange(o.active_, false))
233 {
234 35x }
235
236 lock_awaiter(lock_awaiter const&) = delete;
237 lock_awaiter& operator=(lock_awaiter const&) = delete;
238 lock_awaiter& operator=(lock_awaiter&&) = delete;
239
240 35x bool await_ready() const noexcept
241 {
242 35x if(!m_->locked_)
243 {
244 16x m_->locked_ = true;
245 16x return true;
246 }
247 19x return false;
248 }
249
250 /** IoAwaitable protocol overload. */
251 std::coroutine_handle<>
252 19x await_suspend(
253 std::coroutine_handle<> h,
254 io_env const* env) noexcept
255 {
256 19x if(env->stop_token.stop_requested())
257 {
258 2x canceled_ = true;
259 2x return h;
260 }
261 17x cont_.h = h;
262 17x ex_ = env->executor;
263 17x m_->waiters_.push_back(this);
264 51x ::new(stop_cb_buf_) stop_cb_t(
265 17x env->stop_token, cancel_fn{this});
266 17x active_ = true;
267 17x return std::noop_coroutine();
268 }
269
270 32x io_result<> await_resume() noexcept
271 {
272 32x if(active_)
273 {
274 14x stop_cb_().~stop_cb_t();
275 14x if(canceled_)
276 {
277 6x m_->waiters_.remove(this);
278 6x active_ = false;
279 return {make_error_code(
280 6x error::canceled)};
281 }
282 8x active_ = false;
283 }
284 26x if(canceled_)
285 return {make_error_code(
286 2x error::canceled)};
287 24x return {{}};
288 }
289 };
290
291 /** RAII lock guard for async_mutex.
292
293 Automatically unlocks the mutex when destroyed.
294 */
295 class [[nodiscard]] lock_guard
296 {
297 async_mutex* m_;
298
299 public:
300 9x ~lock_guard()
301 {
302 9x if(m_)
303 2x m_->unlock();
304 9x }
305
306 2x lock_guard() noexcept
307 2x : m_(nullptr)
308 {
309 2x }
310
311 2x explicit lock_guard(async_mutex* m) noexcept
312 2x : m_(m)
313 {
314 2x }
315
316 5x lock_guard(lock_guard&& o) noexcept
317 5x : m_(std::exchange(o.m_, nullptr))
318 {
319 5x }
320
321 lock_guard& operator=(lock_guard&& o) noexcept
322 {
323 if(this != &o)
324 {
325 if(m_)
326 m_->unlock();
327 m_ = std::exchange(o.m_, nullptr);
328 }
329 return *this;
330 }
331
332 lock_guard(lock_guard const&) = delete;
333 lock_guard& operator=(lock_guard const&) = delete;
334 };
335
336 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
337 */
338 class lock_guard_awaiter
339 {
340 async_mutex* m_;
341 lock_awaiter inner_;
342
343 public:
344 4x explicit lock_guard_awaiter(async_mutex* m) noexcept
345 4x : m_(m)
346 4x , inner_(m)
347 {
348 4x }
349
350 4x bool await_ready() const noexcept
351 {
352 4x return inner_.await_ready();
353 }
354
355 /** IoAwaitable protocol overload. */
356 std::coroutine_handle<>
357 2x await_suspend(
358 std::coroutine_handle<> h,
359 io_env const* env) noexcept
360 {
361 2x return inner_.await_suspend(h, env);
362 }
363
364 4x io_result<lock_guard> await_resume() noexcept
365 {
366 4x auto r = inner_.await_resume();
367 4x if(r.ec)
368 2x return {r.ec, {}};
369 2x return {{}, lock_guard(m_)};
370 }
371 };
372
373 /// Construct an unlocked mutex.
374 async_mutex() = default;
375
376 /// Copy constructor (deleted).
377 async_mutex(async_mutex const&) = delete;
378
379 /// Copy assignment (deleted).
380 async_mutex& operator=(async_mutex const&) = delete;
381
382 /// Move constructor (deleted).
383 async_mutex(async_mutex&&) = delete;
384
385 /// Move assignment (deleted).
386 async_mutex& operator=(async_mutex&&) = delete;
387
388 /** Returns an awaiter that acquires the mutex.
389
390 @return An awaitable that await-returns `(error_code)`.
391 */
392 31x lock_awaiter lock() noexcept
393 {
394 31x return lock_awaiter{this};
395 }
396
397 /** Returns an awaiter that acquires the mutex with RAII.
398
399 @return An awaitable that await-returns `(error_code,lock_guard)`.
400 */
401 4x lock_guard_awaiter scoped_lock() noexcept
402 {
403 4x return lock_guard_awaiter(this);
404 }
405
406 /** Releases the mutex.
407
408 If waiters are queued, the next eligible waiter is
409 resumed with the lock held. Canceled waiters are
410 skipped. If no eligible waiter remains, the mutex
411 becomes unlocked.
412 */
413 24x void unlock() noexcept
414 {
415 for(;;)
416 {
417 24x auto* waiter = waiters_.pop_front();
418 24x if(!waiter)
419 {
420 16x locked_ = false;
421 16x return;
422 }
423 8x if(!waiter->claimed_.exchange(
424 true, std::memory_order_acq_rel))
425 {
426 8x waiter->ex_.post(waiter->cont_);
427 8x return;
428 }
429 }
430 }
431
432 /** Returns true if the mutex is currently locked.
433 */
434 26x bool is_locked() const noexcept
435 {
436 26x return locked_;
437 }
438 };
439
440 } // namespace capy
441 } // namespace boost
442
443 #endif
444