Lumiera  0.pre.03
»edit your freedom«
work-force.hpp
Go to the documentation of this file.
1 /*
2  WORK-FORCE.hpp - actively coordinated pool of workers for rendering
3 
4  Copyright (C) Lumiera.org
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 */
22 
23 
52 #ifndef SRC_VAULT_GEAR_WORK_FORCE_H_
53 #define SRC_VAULT_GEAR_WORK_FORCE_H_
54 
55 
56 #include "vault/common.hpp"
57 #include "vault/gear/activity.hpp"
58 #include "lib/meta/function.hpp"
59 #include "lib/thread.hpp"
60 #include "lib/nocopy.hpp"
61 #include "lib/util.hpp"
62 
63 #include <utility>
64 #include <chrono>
65 #include <atomic>
66 #include <list>
67 
68 
69 namespace vault{
70 namespace gear {
71 
72  using std::move;
73  using std::atomic;
74  using util::unConst;
75  using std::chrono::milliseconds;
76  using std::chrono::microseconds;
77  using std::chrono_literals::operator ""ms;
78  using std::chrono_literals::operator ""us;
79  using std::this_thread::sleep_for;
80 
81 
82  namespace {
83  const double MAX_OVERPROVISIONING = 3.0;
84 
85  const size_t CONTEND_SOFT_LIMIT = 3;
86  const size_t CONTEND_STARK_LIMIT = CONTEND_SOFT_LIMIT + 5;
87  const size_t CONTEND_SATURATION = CONTEND_STARK_LIMIT + 4;
88  const size_t CONTEND_SOFT_FACTOR = 100;
89  const size_t CONTEND_RANDOM_STEP = 11;
90  const microseconds CONTEND_WAIT = 100us;
91 
92  inline size_t
93  thisThreadHash()
94  {
95  return std::hash<std::thread::id>{} (std::this_thread::get_id());
96  }
97  }
98 
99  namespace work {
100 
101  using SIG_WorkFun = activity::Proc(void);
102  using SIG_FinalHook = void(bool);
103 
113  struct Config
114  {
115  static size_t COMPUTATION_CAPACITY;
116 
117  const milliseconds IDLE_WAIT = 20ms;
118  const size_t DISMISS_CYCLES = 100;
119 
120  static size_t getDefaultComputationCapacity();
121  };
122 
123 
124 
125  void performRandomisedSpin (size_t,size_t);
126  microseconds steppedRandDelay(size_t,size_t);
127 
128 
129  using Launch = lib::Thread::Launch;
130 
131  /*************************************/
135  template<class CONF>
136  class Worker
137  : CONF
139  {
140  public:
141  Worker (CONF config)
142  : CONF{move (config)}
143  , thread_{Launch{&Worker::pullWork, this}
144  .threadID("Worker")
145  .decorateCounter()}
146  { }
147 
149  std::atomic<bool> emergency{false};
150 
152  bool isDead() const { return not thread_; }
153 
154 
155  private:
156  lib::Thread thread_;
157 
158  void
159  pullWork()
160  {
161  ASSERT_MEMBER_FUNCTOR (&CONF::doWork, SIG_WorkFun);
162  ASSERT_MEMBER_FUNCTOR (&CONF::finalHook, SIG_FinalHook);
163 
164  bool regularExit{false};
165  try /* ================ pull work ===================== */
166  {
167  while (true)
168  {
169  activity::Proc res = CONF::doWork();
170  if (emergency.load (std::memory_order_relaxed))
171  break;
172  if (res == activity::KICK)
173  res = contentionWait();
174  else
175  if (kickLevel_)
176  kickLevel_ /= 2;
177  if (res == activity::WAIT)
178  res = idleWait();
179  else
180  idleCycles_ = 0;
181  if (res != activity::PASS)
182  break;
183  }
184  regularExit = true;
185  }
186  ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread")
187 
188  try /* ================ thread-exit hook ============== */
189  {
190  CONF::finalHook (not regularExit);
191  }
192  ERROR_LOG_AND_IGNORE (threadpool, "failure in thread-exit hook")
193  }// Thread will terminate....
194 
196  idleWait()
197  {
198  ++idleCycles_;
199  if (idleCycles_ < CONF::DISMISS_CYCLES)
200  {
201  sleep_for (CONF::IDLE_WAIT);
202  return activity::PASS;
203  }
204  else // idle beyond threshold => terminate worker
205  return activity::HALT;
206  }
207  size_t idleCycles_{0};
208 
209 
211  contentionWait()
212  {
213  if (not randFact_)
214  randFact_ = thisThreadHash() % CONTEND_RANDOM_STEP;
215 
216  if (kickLevel_ <= CONTEND_SOFT_LIMIT)
217  for (uint i=0; i<kickLevel_; ++i)
218  {
219  performRandomisedSpin (kickLevel_,randFact_);
220  std::this_thread::yield();
221  }
222  else
223  {
224  auto stepping = util::min (kickLevel_, CONTEND_STARK_LIMIT) - CONTEND_SOFT_LIMIT;
225  std::this_thread::sleep_for (steppedRandDelay(stepping,randFact_));
226  }
227 
228  if (kickLevel_ < CONTEND_SATURATION)
229  ++kickLevel_;
230  return activity::PASS;
231  }
232  size_t kickLevel_{0};
233  size_t randFact_{0};
234  };
235  }//(End)namespace work
236 
237 
238 
239 
240  /***********************************************************/
248  template<class CONF>
249  class WorkForce
251  {
252  using Pool = std::list<work::Worker<CONF>>;
253 
254  CONF setup_;
255  Pool workers_;
256 
257 
258  public:
259  WorkForce (CONF config)
260  : setup_{move (config)}
261  , workers_{}
262  { }
263 
264  ~WorkForce()
265  {
266  try {
267  awaitShutdown();
268  }
269  ERROR_LOG_AND_IGNORE (threadpool, "defunct worker thread")
270  }
271 
272 
279  void
280  activate (double degree =1.0)
281  {
282  size_t scale{setup_.COMPUTATION_CAPACITY};
283  scale = size_t(util::limited (0.0, degree*scale, scale*MAX_OVERPROVISIONING));
284  for (uint i = workers_.size(); i < scale; ++i)
285  workers_.emplace_back (setup_);
286  }
287 
288  void
289  incScale(uint step =+1)
290  {
291  uint i = workers_.size();
292  uint target = util::min (i+step, setup_.COMPUTATION_CAPACITY);
293  for ( ; i < target; ++i)
294  workers_.emplace_back (setup_);
295  }
296 
297  void
298  awaitShutdown()
299  {
300  for (auto& w : workers_)
301  w.emergency.store (true, std::memory_order_relaxed);
302  while (0 < size())
303  sleep_for (setup_.IDLE_WAIT);
304  }
305 
306  size_t
307  size() const
308  {
309  unConst(workers_).remove_if([](auto& w){ return w.isDead(); });
310  return workers_.size();
311  }
312  };
313 
314 
315 
316 }} // namespace vault::gear
317 #endif /*SRC_VAULT_GEAR_WORK_FORCE_H_*/
const size_t CONTEND_SOFT_LIMIT
zone for soft anti-contention measures, counting continued contention events
Definition: work-force.hpp:85
Individual worker thread: repeatedly pulls the doWork functor.
Definition: work-force.hpp:136
#define ERROR_LOG_AND_IGNORE(_FLAG_, _OP_DESCR_)
convenience shortcut for a sequence of catch blocks just logging and consuming an error...
Definition: error.hpp:275
#define ASSERT_MEMBER_FUNCTOR(_EXPR_, _SIG_)
Macro for a compile-time check to verify some member is present and comprises something invokable wit...
Definition: function.hpp:282
const microseconds CONTEND_WAIT
base time unit for the exponentially stepped-up sleep delay in case of contention ...
Definition: work-force.hpp:90
void(bool) SIG_FinalHook
config should define callable invoked at exit (argument: isFailure)
Definition: work-force.hpp:102
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Definition: scheduler.hpp:141
Any copy and copy construction prohibited.
Definition: nocopy.hpp:46
Primary class template for std::hash.
microseconds steppedRandDelay(size_t, size_t)
Calculate the delay time for a stronger anti-contention wait.
Definition: work-force.cpp:91
activity::Proc(void) SIG_WorkFun
config should define a callable with this signature to perform work
Definition: work-force.hpp:101
void performRandomisedSpin(size_t, size_t)
This is part of the weak level of anti-contention measures.
Definition: work-force.cpp:75
Mix-Ins to allow or prohibit various degrees of copying and cloning.
Metaprogramming tools for transforming functor types.
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Definition: scheduler.hpp:140
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
bool isDead() const
this Worker starts out active, but may terminate
Definition: work-force.hpp:152
Convenience front-end to simplify and codify basic thread handling.
const size_t CONTEND_STARK_LIMIT
zone for stark measures, performing a sleep with exponential stepping
Definition: work-force.hpp:86
const size_t CONTEND_SATURATION
upper limit for the contention event count
Definition: work-force.hpp:87
const size_t CONTEND_SOFT_FACTOR
base counter for a spinning wait loop
Definition: work-force.hpp:88
const milliseconds IDLE_WAIT
wait period when a worker falls idle
Definition: work-force.hpp:117
const size_t DISMISS_CYCLES
number of idle cycles after which the worker terminates
Definition: work-force.hpp:118
void activate(double degree=1.0)
Activate or scale up the worker pool.
Definition: work-force.hpp:280
Basic set of definitions and includes commonly used together (Vault).
A thin convenience wrapper to simplify thread-handling.
Definition: thread.hpp:656
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:115
Proc
Result instruction from Activity activation.
Definition: activity.hpp:149
Pool of worker threads for rendering.
Definition: work-force.hpp:249
const size_t CONTEND_RANDOM_STEP
stepping for randomisation of anti-contention measures
Definition: work-force.hpp:89
Base for configuration of the worker pool.
Definition: work-force.hpp:113
const double MAX_OVERPROVISIONING
safety guard to prevent catastrophic over-provisioning
Definition: work-force.hpp:83
Vault-Layer implementation namespace root.
static size_t getDefaultComputationCapacity()
default value for full computing capacity is to use all (virtual) cores.
Definition: work-force.cpp:60
Descriptor for a piece of operational logic performed by the scheduler.