1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
15  
#include <boost/capy/continuation.hpp>
15  
#include <boost/capy/continuation.hpp>
16  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/executor.hpp>
17  
#include <boost/capy/error.hpp>
17  
#include <boost/capy/error.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  

20  

21  
#include <stop_token>
21  
#include <stop_token>
22  

22  

23  
#include <atomic>
23  
#include <atomic>
24  
#include <coroutine>
24  
#include <coroutine>
25  
#include <new>
25  
#include <new>
26  
#include <utility>
26  
#include <utility>
27  

27  

28  
/*  async_event implementation notes
28  
/*  async_event implementation notes
29  
    =================================
29  
    =================================
30  

30  

31  
    Same cancellation pattern as async_mutex (see that file for the
31  
    Same cancellation pattern as async_mutex (see that file for the
32  
    full discussion on claimed_, stop_cb lifetime, member ordering,
32  
    full discussion on claimed_, stop_cb lifetime, member ordering,
33  
    and threading assumptions).
33  
    and threading assumptions).
34  

34  

35  
    Key difference: set() wakes ALL waiters (broadcast), not one.
35  
    Key difference: set() wakes ALL waiters (broadcast), not one.
36  
    It pops every waiter from the list and posts the ones it
36  
    It pops every waiter from the list and posts the ones it
37  
    claims. Waiters already claimed by a stop callback are skipped.
37  
    claims. Waiters already claimed by a stop callback are skipped.
38  

38  

39  
    Because set() pops all waiters, a canceled waiter may have been
39  
    Because set() pops all waiters, a canceled waiter may have been
40  
    removed from the list by set() before its await_resume runs.
40  
    removed from the list by set() before its await_resume runs.
41  
    This requires a separate in_list_ flag (unlike async_mutex where
41  
    This requires a separate in_list_ flag (unlike async_mutex where
42  
    active_ served double duty). await_resume only calls remove()
42  
    active_ served double duty). await_resume only calls remove()
43  
    when in_list_ is true.
43  
    when in_list_ is true.
44  
*/
44  
*/
45  

45  

46  
namespace boost {
46  
namespace boost {
47  
namespace capy {
47  
namespace capy {
48  

48  

49  
/** An asynchronous event for coroutines.
49  
/** An asynchronous event for coroutines.
50  

50  

51  
    This event provides a way to notify multiple coroutines that some
51  
    This event provides a way to notify multiple coroutines that some
52  
    condition has occurred. When a coroutine awaits an unset event, it
52  
    condition has occurred. When a coroutine awaits an unset event, it
53  
    suspends and is added to a wait queue. When the event is set, all
53  
    suspends and is added to a wait queue. When the event is set, all
54  
    waiting coroutines are resumed.
54  
    waiting coroutines are resumed.
55  

55  

56  
    @par Cancellation
56  
    @par Cancellation
57  

57  

58  
    When a coroutine is suspended waiting for the event and its stop
58  
    When a coroutine is suspended waiting for the event and its stop
59  
    token is triggered, the waiter completes with `error::canceled`
59  
    token is triggered, the waiter completes with `error::canceled`
60  
    instead of waiting for `set()`.
60  
    instead of waiting for `set()`.
61  

61  

62  
    Cancellation only applies while the coroutine is suspended in the
62  
    Cancellation only applies while the coroutine is suspended in the
63  
    wait queue. If the event is already set when `wait()` is called,
63  
    wait queue. If the event is already set when `wait()` is called,
64  
    the wait completes immediately even if the stop token is already
64  
    the wait completes immediately even if the stop token is already
65  
    signaled.
65  
    signaled.
66  

66  

67  
    @par Zero Allocation
67  
    @par Zero Allocation
68  

68  

69  
    No heap allocation occurs for wait operations.
69  
    No heap allocation occurs for wait operations.
70  

70  

71  
    @par Thread Safety
71  
    @par Thread Safety
72  

72  

73  
    Distinct objects: Safe.@n
73  
    Distinct objects: Safe.@n
74  
    Shared objects: Unsafe.
74  
    Shared objects: Unsafe.
75  

75  

76  
    The event operations are designed for single-threaded use on one
76  
    The event operations are designed for single-threaded use on one
77  
    executor. The stop callback may fire from any thread.
77  
    executor. The stop callback may fire from any thread.
78  

78  

79  
    This type is non-copyable and non-movable because suspended
79  
    This type is non-copyable and non-movable because suspended
80  
    waiters hold intrusive pointers into the event's internal list.
80  
    waiters hold intrusive pointers into the event's internal list.
81  

81  

82  
    @par Example
82  
    @par Example
83  
    @code
83  
    @code
84  
    async_event event;
84  
    async_event event;
85  

85  

86  
    task<> waiter() {
86  
    task<> waiter() {
87  
        auto [ec] = co_await event.wait();
87  
        auto [ec] = co_await event.wait();
88  
        if(ec)
88  
        if(ec)
89  
            co_return;
89  
            co_return;
90  
        // ... event was set ...
90  
        // ... event was set ...
91  
    }
91  
    }
92  

92  

93  
    task<> notifier() {
93  
    task<> notifier() {
94  
        // ... do some work ...
94  
        // ... do some work ...
95  
        event.set();  // Wake all waiters
95  
        event.set();  // Wake all waiters
96  
    }
96  
    }
97  
    @endcode
97  
    @endcode
98  
*/
98  
*/
99  
class async_event
99  
class async_event
100  
{
100  
{
101  
public:
101  
public:
102  
    class wait_awaiter;
102  
    class wait_awaiter;
103  

103  

104  
private:
104  
private:
105  
    bool set_ = false;
105  
    bool set_ = false;
106  
    detail::intrusive_list<wait_awaiter> waiters_;
106  
    detail::intrusive_list<wait_awaiter> waiters_;
107  

107  

108  
public:
108  
public:
109  
    /** Awaiter returned by wait().
109  
    /** Awaiter returned by wait().
110  
    */
110  
    */
111  
    class wait_awaiter
111  
    class wait_awaiter
112  
        : public detail::intrusive_list<wait_awaiter>::node
112  
        : public detail::intrusive_list<wait_awaiter>::node
113  
    {
113  
    {
114  
        friend class async_event;
114  
        friend class async_event;
115  

115  

116  
        async_event* e_;
116  
        async_event* e_;
117  
        continuation cont_;
117  
        continuation cont_;
118  
        executor_ref ex_;
118  
        executor_ref ex_;
119  

119  

120  
        // Declared before stop_cb_buf_: the callback
120  
        // Declared before stop_cb_buf_: the callback
121  
        // accesses these members, so they must still be
121  
        // accesses these members, so they must still be
122  
        // alive if the stop_cb_ destructor blocks.
122  
        // alive if the stop_cb_ destructor blocks.
123  
        std::atomic<bool> claimed_{false};
123  
        std::atomic<bool> claimed_{false};
124  
        bool canceled_ = false;
124  
        bool canceled_ = false;
125  
        bool active_ = false;
125  
        bool active_ = false;
126  
        bool in_list_ = false;
126  
        bool in_list_ = false;
127  

127  

128  
        struct cancel_fn
128  
        struct cancel_fn
129  
        {
129  
        {
130  
            wait_awaiter* self_;
130  
            wait_awaiter* self_;
131  

131  

132  
            void operator()() const noexcept
132  
            void operator()() const noexcept
133  
            {
133  
            {
134  
                if(!self_->claimed_.exchange(
134  
                if(!self_->claimed_.exchange(
135  
                    true, std::memory_order_acq_rel))
135  
                    true, std::memory_order_acq_rel))
136  
                {
136  
                {
137  
                    self_->canceled_ = true;
137  
                    self_->canceled_ = true;
138  
                    self_->ex_.post(self_->cont_);
138  
                    self_->ex_.post(self_->cont_);
139  
                }
139  
                }
140  
            }
140  
            }
141  
        };
141  
        };
142  

142  

143  
        using stop_cb_t =
143  
        using stop_cb_t =
144  
            std::stop_callback<cancel_fn>;
144  
            std::stop_callback<cancel_fn>;
145  

145  

146  
        // Aligned storage for stop_cb_t. Declared last:
146  
        // Aligned storage for stop_cb_t. Declared last:
147  
        // its destructor may block while the callback
147  
        // its destructor may block while the callback
148  
        // accesses the members above.
148  
        // accesses the members above.
149  
        BOOST_CAPY_MSVC_WARNING_PUSH
149  
        BOOST_CAPY_MSVC_WARNING_PUSH
150  
        BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
150  
        BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
151  
        alignas(stop_cb_t)
151  
        alignas(stop_cb_t)
152  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
152  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
153  
        BOOST_CAPY_MSVC_WARNING_POP
153  
        BOOST_CAPY_MSVC_WARNING_POP
154  

154  

155  
        stop_cb_t& stop_cb_() noexcept
155  
        stop_cb_t& stop_cb_() noexcept
156  
        {
156  
        {
157  
            return *reinterpret_cast<stop_cb_t*>(
157  
            return *reinterpret_cast<stop_cb_t*>(
158  
                stop_cb_buf_);
158  
                stop_cb_buf_);
159  
        }
159  
        }
160  

160  

161  
    public:
161  
    public:
162  
        ~wait_awaiter()
162  
        ~wait_awaiter()
163  
        {
163  
        {
164  
            if(active_)
164  
            if(active_)
165  
                stop_cb_().~stop_cb_t();
165  
                stop_cb_().~stop_cb_t();
166  
            if(in_list_)
166  
            if(in_list_)
167  
                e_->waiters_.remove(this);
167  
                e_->waiters_.remove(this);
168  
        }
168  
        }
169  

169  

170  
        explicit wait_awaiter(async_event* e) noexcept
170  
        explicit wait_awaiter(async_event* e) noexcept
171  
            : e_(e)
171  
            : e_(e)
172  
        {
172  
        {
173  
        }
173  
        }
174  

174  

175  
        wait_awaiter(wait_awaiter&& o) noexcept
175  
        wait_awaiter(wait_awaiter&& o) noexcept
176  
            : e_(o.e_)
176  
            : e_(o.e_)
177  
            , cont_(o.cont_)
177  
            , cont_(o.cont_)
178  
            , ex_(o.ex_)
178  
            , ex_(o.ex_)
179  
            , claimed_(o.claimed_.load(
179  
            , claimed_(o.claimed_.load(
180  
                std::memory_order_relaxed))
180  
                std::memory_order_relaxed))
181  
            , canceled_(o.canceled_)
181  
            , canceled_(o.canceled_)
182  
            , active_(std::exchange(o.active_, false))
182  
            , active_(std::exchange(o.active_, false))
183  
            , in_list_(std::exchange(o.in_list_, false))
183  
            , in_list_(std::exchange(o.in_list_, false))
184  
        {
184  
        {
185  
        }
185  
        }
186  

186  

187  
        wait_awaiter(wait_awaiter const&) = delete;
187  
        wait_awaiter(wait_awaiter const&) = delete;
188  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
188  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
189  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
189  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
190  

190  

191  
        bool await_ready() const noexcept
191  
        bool await_ready() const noexcept
192  
        {
192  
        {
193  
            return e_->set_;
193  
            return e_->set_;
194  
        }
194  
        }
195  

195  

196  
        /** IoAwaitable protocol overload. */
196  
        /** IoAwaitable protocol overload. */
197  
        std::coroutine_handle<>
197  
        std::coroutine_handle<>
198  
        await_suspend(
198  
        await_suspend(
199  
            std::coroutine_handle<> h,
199  
            std::coroutine_handle<> h,
200  
            io_env const* env) noexcept
200  
            io_env const* env) noexcept
201  
        {
201  
        {
202  
            if(env->stop_token.stop_requested())
202  
            if(env->stop_token.stop_requested())
203  
            {
203  
            {
204  
                canceled_ = true;
204  
                canceled_ = true;
205  
                return h;
205  
                return h;
206  
            }
206  
            }
207  
            cont_.h = h;
207  
            cont_.h = h;
208  
            ex_ = env->executor;
208  
            ex_ = env->executor;
209  
            e_->waiters_.push_back(this);
209  
            e_->waiters_.push_back(this);
210  
            in_list_ = true;
210  
            in_list_ = true;
211  
            ::new(stop_cb_buf_) stop_cb_t(
211  
            ::new(stop_cb_buf_) stop_cb_t(
212  
                env->stop_token, cancel_fn{this});
212  
                env->stop_token, cancel_fn{this});
213  
            active_ = true;
213  
            active_ = true;
214  
            return std::noop_coroutine();
214  
            return std::noop_coroutine();
215  
        }
215  
        }
216  

216  

217  
        io_result<> await_resume() noexcept
217  
        io_result<> await_resume() noexcept
218  
        {
218  
        {
219  
            if(active_)
219  
            if(active_)
220  
            {
220  
            {
221  
                stop_cb_().~stop_cb_t();
221  
                stop_cb_().~stop_cb_t();
222  
                active_ = false;
222  
                active_ = false;
223  
            }
223  
            }
224  
            if(canceled_)
224  
            if(canceled_)
225  
            {
225  
            {
226  
                if(in_list_)
226  
                if(in_list_)
227  
                {
227  
                {
228  
                    e_->waiters_.remove(this);
228  
                    e_->waiters_.remove(this);
229  
                    in_list_ = false;
229  
                    in_list_ = false;
230  
                }
230  
                }
231  
                return {make_error_code(
231  
                return {make_error_code(
232  
                    error::canceled)};
232  
                    error::canceled)};
233  
            }
233  
            }
234  
            return {{}};
234  
            return {{}};
235  
        }
235  
        }
236  
    };
236  
    };
237  

237  

238  
    /// Construct an unset event.
238  
    /// Construct an unset event.
239  
    async_event() = default;
239  
    async_event() = default;
240  

240  

241  
    /// Copy constructor (deleted).
241  
    /// Copy constructor (deleted).
242  
    async_event(async_event const&) = delete;
242  
    async_event(async_event const&) = delete;
243  

243  

244  
    /// Copy assignment (deleted).
244  
    /// Copy assignment (deleted).
245  
    async_event& operator=(async_event const&) = delete;
245  
    async_event& operator=(async_event const&) = delete;
246  

246  

247  
    /// Move constructor (deleted).
247  
    /// Move constructor (deleted).
248  
    async_event(async_event&&) = delete;
248  
    async_event(async_event&&) = delete;
249  

249  

250  
    /// Move assignment (deleted).
250  
    /// Move assignment (deleted).
251  
    async_event& operator=(async_event&&) = delete;
251  
    async_event& operator=(async_event&&) = delete;
252  

252  

253  
    /** Returns an awaiter that waits until the event is set.
253  
    /** Returns an awaiter that waits until the event is set.
254  

254  

255  
        If the event is already set, completes immediately.
255  
        If the event is already set, completes immediately.
256  

256  

257  
        @return An awaitable that await-returns `(error_code)`.
257  
        @return An awaitable that await-returns `(error_code)`.
258  
    */
258  
    */
259  
    wait_awaiter wait() noexcept
259  
    wait_awaiter wait() noexcept
260  
    {
260  
    {
261  
        return wait_awaiter{this};
261  
        return wait_awaiter{this};
262  
    }
262  
    }
263  

263  

264  
    /** Sets the event.
264  
    /** Sets the event.
265  

265  

266  
        All waiting coroutines are resumed. Canceled waiters
266  
        All waiting coroutines are resumed. Canceled waiters
267  
        are skipped. Subsequent calls to wait() complete
267  
        are skipped. Subsequent calls to wait() complete
268  
        immediately until clear() is called.
268  
        immediately until clear() is called.
269  
    */
269  
    */
270  
    void set()
270  
    void set()
271  
    {
271  
    {
272  
        set_ = true;
272  
        set_ = true;
273  
        for(;;)
273  
        for(;;)
274  
        {
274  
        {
275  
            auto* w = waiters_.pop_front();
275  
            auto* w = waiters_.pop_front();
276  
            if(!w)
276  
            if(!w)
277  
                break;
277  
                break;
278  
            w->in_list_ = false;
278  
            w->in_list_ = false;
279  
            if(!w->claimed_.exchange(
279  
            if(!w->claimed_.exchange(
280  
                true, std::memory_order_acq_rel))
280  
                true, std::memory_order_acq_rel))
281  
            {
281  
            {
282  
                w->ex_.post(w->cont_);
282  
                w->ex_.post(w->cont_);
283  
            }
283  
            }
284  
        }
284  
        }
285  
    }
285  
    }
286  

286  

287  
    /** Clears the event.
287  
    /** Clears the event.
288  

288  

289  
        Subsequent calls to wait() will suspend until
289  
        Subsequent calls to wait() will suspend until
290  
        set() is called again.
290  
        set() is called again.
291  
    */
291  
    */
292  
    void clear() noexcept
292  
    void clear() noexcept
293  
    {
293  
    {
294  
        set_ = false;
294  
        set_ = false;
295  
    }
295  
    }
296  

296  

297  
    /** Returns true if the event is currently set.
297  
    /** Returns true if the event is currently set.
298  
    */
298  
    */
299  
    bool is_set() const noexcept
299  
    bool is_set() const noexcept
300  
    {
300  
    {
301  
        return set_;
301  
        return set_;
302  
    }
302  
    }
303  
};
303  
};
304  

304  

305  
} // namespace capy
305  
} // namespace capy
306  
} // namespace boost
306  
} // namespace boost
307  

307  

308  
#endif
308  
#endif