LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 126 126
Test Date: 2026-03-21 03:20:11 Functions: 100.0 % 25 25

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <boost/capy/test/thread_name.hpp>
      14                 : #include <algorithm>
      15                 : #include <atomic>
      16                 : #include <condition_variable>
      17                 : #include <cstdio>
      18                 : #include <mutex>
      19                 : #include <thread>
      20                 : #include <vector>
      21                 : 
      22                 : /*
      23                 :     Thread pool implementation using a shared work queue.
      24                 : 
      25                 :     Work items are continuations linked via their intrusive next pointer,
      26                 :     stored in a single queue protected by a mutex. No per-post heap
      27                 :     allocation: the continuation is owned by the caller and linked
      28                 :     directly. Worker threads wait on a condition_variable until work
      29                 :     is available or stop is requested.
      30                 : 
      31                 :     Threads are started lazily on first post() via std::call_once to avoid
      32                 :     spawning threads for pools that are constructed but never used. Each
      33                 :     thread is named with a configurable prefix plus index for debugger
      34                 :     visibility.
      35                 : 
      36                 :     Work tracking: on_work_started/on_work_finished maintain an atomic
      37                 :     outstanding_work_ counter. join() blocks until this counter reaches
      38                 :     zero, then signals workers to stop and joins threads.
      39                 : 
      40                 :     Two shutdown paths:
      41                 :     - join(): waits for outstanding work to drain, then stops workers.
      42                 :     - stop(): immediately signals workers to exit; queued work is abandoned.
      43                 :     - Destructor: stop() then join() (abandon + wait for threads).
      44                 : */
      45                 : 
      46                 : namespace boost {
      47                 : namespace capy {
      48                 : 
      49                 : //------------------------------------------------------------------------------
      50                 : 
      51                 : class thread_pool::impl
      52                 : {
      53                 :     // Intrusive queue of continuations via continuation::next.
      54                 :     // No per-post allocation: the continuation is owned by the caller.
      55                 :     continuation* head_ = nullptr;
      56                 :     continuation* tail_ = nullptr;
      57                 : 
      58 HIT         828 :     void push(continuation* c) noexcept
      59                 :     {
      60             828 :         c->next = nullptr;
      61             828 :         if(tail_)
      62             602 :             tail_->next = c;
      63                 :         else
      64             226 :             head_ = c;
      65             828 :         tail_ = c;
      66             828 :     }
      67                 : 
      68             985 :     continuation* pop() noexcept
      69                 :     {
      70             985 :         if(!head_)
      71             157 :             return nullptr;
      72             828 :         continuation* c = head_;
      73             828 :         head_ = head_->next;
      74             828 :         if(!head_)
      75             226 :             tail_ = nullptr;
      76             828 :         return c;
      77                 :     }
      78                 : 
      79            1064 :     bool empty() const noexcept
      80                 :     {
      81            1064 :         return head_ == nullptr;
      82                 :     }
      83                 : 
      84                 :     std::mutex mutex_;
      85                 :     std::condition_variable cv_;
      86                 :     std::vector<std::thread> threads_;
      87                 :     std::atomic<std::size_t> outstanding_work_{0};
      88                 :     bool stop_{false};
      89                 :     bool joined_{false};
      90                 :     std::size_t num_threads_;
      91                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      92                 :     std::once_flag start_flag_;
      93                 : 
      94                 : public:
      95             157 :     ~impl() = default;
      96                 : 
      97                 :     // Destroy abandoned coroutine frames. Must be called
      98                 :     // before execution_context::shutdown()/destroy() so
      99                 :     // that suspended-frame destructors (e.g. delay_awaitable
     100                 :     // calling timer_service::cancel()) run while services
     101                 :     // are still valid.
     102                 :     void
     103             157 :     drain_abandoned() noexcept
     104                 :     {
     105             345 :         while(auto* c = pop())
     106                 :         {
     107             188 :             auto h = c->h;
     108             188 :             if(h && h != std::noop_coroutine())
     109             136 :                 h.destroy();
     110             188 :         }
     111             157 :     }
     112                 : 
     113             157 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
     114             157 :         : num_threads_(num_threads)
     115                 :     {
     116             157 :         if(num_threads_ == 0)
     117               4 :             num_threads_ = std::max(
     118               2 :                 std::thread::hardware_concurrency(), 1u);
     119                 : 
     120                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
     121             157 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
     122             157 :         thread_name_prefix_[n] = '\0';
     123             157 :     }
     124                 : 
     125                 :     void
     126             828 :     post(continuation& c)
     127                 :     {
     128             828 :         ensure_started();
     129                 :         {
     130             828 :             std::lock_guard<std::mutex> lock(mutex_);
     131             828 :             push(&c);
     132             828 :         }
     133             828 :         cv_.notify_one();
     134             828 :     }
     135                 : 
     136                 :     void
     137             345 :     on_work_started() noexcept
     138                 :     {
     139             345 :         outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
     140             345 :     }
     141                 : 
     142                 :     void
     143             345 :     on_work_finished() noexcept
     144                 :     {
     145             345 :         if(outstanding_work_.fetch_sub(
     146             345 :             1, std::memory_order_acq_rel) == 1)
     147                 :         {
     148              85 :             std::lock_guard<std::mutex> lock(mutex_);
     149              85 :             if(joined_ && !stop_)
     150               4 :                 stop_ = true;
     151              85 :             cv_.notify_all();
     152              85 :         }
     153             345 :     }
     154                 : 
     155                 :     void
     156             168 :     join() noexcept
     157                 :     {
     158                 :         {
     159             168 :             std::unique_lock<std::mutex> lock(mutex_);
     160             168 :             if(joined_)
     161              11 :                 return;
     162             157 :             joined_ = true;
     163                 : 
     164             157 :             if(outstanding_work_.load(
     165             157 :                 std::memory_order_acquire) == 0)
     166                 :             {
     167             103 :                 stop_ = true;
     168             103 :                 cv_.notify_all();
     169                 :             }
     170                 :             else
     171                 :             {
     172              54 :                 cv_.wait(lock, [this]{
     173              59 :                     return stop_;
     174                 :                 });
     175                 :             }
     176             168 :         }
     177                 : 
     178             336 :         for(auto& t : threads_)
     179             179 :             if(t.joinable())
     180             179 :                 t.join();
     181                 :     }
     182                 : 
     183                 :     void
     184             159 :     stop() noexcept
     185                 :     {
     186                 :         {
     187             159 :             std::lock_guard<std::mutex> lock(mutex_);
     188             159 :             stop_ = true;
     189             159 :         }
     190             159 :         cv_.notify_all();
     191             159 :     }
     192                 : 
     193                 : private:
     194                 :     void
     195             828 :     ensure_started()
     196                 :     {
     197             828 :         std::call_once(start_flag_, [this]{
     198             101 :             threads_.reserve(num_threads_);
     199             280 :             for(std::size_t i = 0; i < num_threads_; ++i)
     200             358 :                 threads_.emplace_back([this, i]{ run(i); });
     201             101 :         });
     202             828 :     }
     203                 : 
     204                 :     void
     205             179 :     run(std::size_t index)
     206                 :     {
     207                 :         // Build name; set_current_thread_name truncates to platform limits.
     208                 :         char name[16];
     209             179 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     210             179 :         set_current_thread_name(name);
     211                 : 
     212                 :         for(;;)
     213                 :         {
     214             819 :             continuation* c = nullptr;
     215                 :             {
     216             819 :                 std::unique_lock<std::mutex> lock(mutex_);
     217             819 :                 cv_.wait(lock, [this]{
     218            1396 :                     return !empty() ||
     219            1396 :                         stop_;
     220                 :                 });
     221             819 :                 if(stop_)
     222             358 :                     return;
     223             640 :                 c = pop();
     224             819 :             }
     225             640 :             if(c)
     226             640 :                 c->h.resume();
     227             640 :         }
     228                 :     }
     229                 : };
     230                 : 
     231                 : //------------------------------------------------------------------------------
     232                 : 
     233             157 : thread_pool::
     234                 : ~thread_pool()
     235                 : {
     236             157 :     impl_->stop();
     237             157 :     impl_->join();
     238             157 :     impl_->drain_abandoned();
     239             157 :     shutdown();
     240             157 :     destroy();
     241             157 :     delete impl_;
     242             157 : }
     243                 : 
     244             157 : thread_pool::
     245             157 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     246             157 :     : impl_(new impl(num_threads, thread_name_prefix))
     247                 : {
     248             157 :     this->set_frame_allocator(std::allocator<void>{});
     249             157 : }
     250                 : 
     251                 : void
     252              11 : thread_pool::
     253                 : join() noexcept
     254                 : {
     255              11 :     impl_->join();
     256              11 : }
     257                 : 
     258                 : void
     259               2 : thread_pool::
     260                 : stop() noexcept
     261                 : {
     262               2 :     impl_->stop();
     263               2 : }
     264                 : 
     265                 : //------------------------------------------------------------------------------
     266                 : 
     267                 : thread_pool::executor_type
     268             163 : thread_pool::
     269                 : get_executor() const noexcept
     270                 : {
     271             163 :     return executor_type(
     272             163 :         const_cast<thread_pool&>(*this));
     273                 : }
     274                 : 
     275                 : void
     276             345 : thread_pool::executor_type::
     277                 : on_work_started() const noexcept
     278                 : {
     279             345 :     pool_->impl_->on_work_started();
     280             345 : }
     281                 : 
     282                 : void
     283             345 : thread_pool::executor_type::
     284                 : on_work_finished() const noexcept
     285                 : {
     286             345 :     pool_->impl_->on_work_finished();
     287             345 : }
     288                 : 
     289                 : void
     290             828 : thread_pool::executor_type::
     291                 : post(continuation& c) const
     292                 : {
     293             828 :     pool_->impl_->post(c);
     294             828 : }
     295                 : 
     296                 : } // capy
     297                 : } // boost
        

Generated by: LCOV version 2.3