Lumiera  0.pre.03
»edit your freedom«
work-force-test.cpp
Go to the documentation of this file.
1 /*
2  WorkForce(Test) - verify worker thread service
3 
4  Copyright (C) Lumiera.org
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 * *****************************************************/
22 
28 #include "lib/test/run.hpp"
30 #include "lib/thread.hpp"
31 #include "lib/sync.hpp"
32 
33 #include <functional>
34 #include <thread>
35 #include <chrono>
36 #include <set>
37 
38 using test::Test;
39 
40 
41 namespace vault{
42 namespace gear {
43 namespace test {
44 
45  using std::this_thread::sleep_for;
46  using namespace std::chrono_literals;
47  using std::chrono::milliseconds;
48  using lib::Thread;
49 
50 
51  namespace {
52  using WorkFun = std::function<work::SIG_WorkFun>;
53  using FinalFun = std::function<work::SIG_FinalHook>;
54 
60  template<class FUN>
61  auto
62  setup (FUN&& workFun)
63  {
64  struct Setup
65  : work::Config
66  {
67  WorkFun doWork;
68  FinalFun finalHook = [](bool){ /*NOP*/ };
69 
70  milliseconds IDLE_WAIT = work::Config::IDLE_WAIT;
72 
73  Setup (FUN&& workFun)
74  : doWork{std::forward<FUN> (workFun)}
75  { }
76 
77  Setup&&
78  withFinalHook (FinalFun finalFun)
79  {
80  finalHook = move (finalFun);
81  return move(*this);
82  }
83 
84  Setup&&
85  withSleepPeriod (std::chrono::milliseconds millis)
86  {
87  IDLE_WAIT = millis;
88  return move(*this);
89  }
90 
91  Setup&&
92  dismissAfter (size_t cycles)
93  {
94  DISMISS_CYCLES = cycles;
95  return move(*this);
96  }
97 
98  };
99 
100  return Setup{std::forward<FUN> (workFun)};
101  }
102  }
103 
104 
105 
106 
107  /*************************************************************************/
112  class WorkForce_test : public Test
113  {
114 
115  virtual void
116  run (Arg)
117  {
118  simpleUsage();
119 
120  verify_pullWork();
121  verify_workerHalt();
122  verify_workerSleep();
123  verify_workerRetard();
124  verify_workerDismiss();
125  verify_finalHook();
126  verify_detectError();
127  verify_defaultPool();
128  verify_scalePool();
129  verify_countActive();
130  verify_dtor_blocks();
131  }
132 
133 
136  void
138  {
139  atomic<uint> check{0};
140  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
141  // ^^^ this is the doWork-λ
142  CHECK (0 == check);
143 
144  wof.activate();
145  sleep_for(20ms);
146 
147  CHECK (0 < check); // λ invoked in the worker threads
148  }
149 
150 
151 
154  void
156  {
157  atomic<uint> check{0};
158  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
159 
160  CHECK (0 == check);
161 
162  wof.incScale();
163  sleep_for(20ms);
164 
165  uint invocations = check;
166  CHECK (0 < invocations);
167 
168  sleep_for(2ms);
169  CHECK (invocations < check);
170 
171  invocations = check;
172  sleep_for(2ms);
173  CHECK (invocations < check);
174 
175  wof.awaitShutdown();
176 
177  invocations = check;
178  sleep_for(2ms);
179  CHECK (invocations == check);
180  }
181 
182 
183 
186  void
188  {
189  atomic<uint> check{0};
190  atomic<activity::Proc> control{activity::PASS};
191  WorkForce wof{setup ([&]{ ++check; return activity::Proc(control); })};
192 
193  wof.incScale();
194  sleep_for(1ms);
195 
196  uint invocations = check;
197  CHECK (0 < invocations);
198 
199  control = activity::HALT;
200  sleep_for(1ms);
201 
202  invocations = check;
203  sleep_for(10ms);
204  CHECK (invocations == check);
205  }
206 
207 
208 
211  void
213  {
214  atomic<uint> check{0};
215  WorkForce wof{setup ([&]{ ++check; return activity::WAIT; })
216  .withSleepPeriod (10ms)};
217 
218  wof.incScale();
219  sleep_for(1ms);
220 
221  CHECK (1 == check);
222 
223  sleep_for(10us);
224  CHECK (1 == check);
225 
226  sleep_for(12ms); // after waiting one sleep-period...
227  CHECK (2 == check); // ...functor invoked again
228  }
229 
230 
231 
234  void
236  {
237  atomic<uint> check{0};
238  { // ▽▽▽▽ regular work-cycles without delay
239  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
240  wof.incScale();
241  sleep_for(5ms);
242  }
243  uint cyclesPASS{check};
244  check = 0;
245  { // ▽▽▽▽ signals »contention«
246  WorkForce wof{setup ([&]{ ++check; return activity::KICK; })};
247  wof.incScale();
248  sleep_for(5ms);
249  }
250  uint cyclesKICK{check};
251  CHECK (cyclesKICK < cyclesPASS);
252  CHECK (cyclesKICK < 50);
253  }
254 
255 
256 
260  void
262  {
263  atomic<uint> check{0};
264  WorkForce wof{setup ([&]{ ++check; return activity::WAIT; })
265  .withSleepPeriod (10ms)
266  .dismissAfter(5)};
267 
268  wof.incScale();
269  sleep_for(1ms);
270 
271  CHECK (1 == check);
272 
273  sleep_for(12ms);
274  CHECK (2 == check); // after one wait cycle, one further invocation
275 
276  sleep_for(100ms);
277  CHECK (5 == check); // only 5 invocations total...
278  CHECK (0 == wof.size()); // ...after that, the worker terminated
279  }
280 
281 
282 
285  void
287  {
288  atomic<uint> exited{0};
289  atomic<activity::Proc> control{activity::PASS};
290  WorkForce wof{setup([&]{ return activity::Proc(control); })
291  .withFinalHook([&](bool){ ++exited; })};
292 
293  CHECK (0 == exited);
294 
295  wof.activate();
296  sleep_for(10ms);
297  CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
298  CHECK (0 == exited);
299 
300  control = activity::HALT;
301  sleep_for(10ms);
302  CHECK (0 == wof.size());
303  CHECK (exited == work::Config::COMPUTATION_CAPACITY);
304  }
305 
306 
307 
312  void
314  {
315  atomic<uint> check{0};
316  atomic<uint> errors{0};
317  WorkForce wof{setup ([&]{
318  if (++check == 555)
319  throw error::State("evil");
320  return activity::PASS;
321  })
322  .withFinalHook([&](bool isFailure)
323  {
324  if (isFailure)
325  ++errors;
326  })};
327  CHECK (0 == check);
328  CHECK (0 == errors);
329 
330  wof.incScale();
331  wof.incScale();
332  wof.incScale();
333 
334  sleep_for(10us);
335  CHECK (3 == wof.size());
336  CHECK (0 < check);
337  CHECK (0 == errors);
338 
339  sleep_for(200ms); // wait for the programmed disaster
340  CHECK (2 == wof.size());
341  CHECK (1 == errors);
342  }
343 
344 
345 
350  void
352  {
353  atomic<uint> check{0};
354  WorkForce wof{setup ([&]{ ++check; return activity::PASS; })};
355 
356  // after construction, the WorkForce is inactive
357  CHECK (0 == wof.size());
358  CHECK (0 == check);
359 
360  wof.activate();
361  sleep_for(20ms);
362 
363  CHECK (0 < check);
364  CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
365  CHECK (work::Config::COMPUTATION_CAPACITY == std::thread::hardware_concurrency());
366  }
367 
368 
369 
373  void
375  {
377  class UniqueCnt
378  : public std::set<std::thread::id>
379  , public lib::Sync<>
380  {
381  public:
382  void
383  mark (std::thread::id const& tID)
384  {
385  Lock guard{this};
386  this->insert(tID);
387  }
388 
389  operator size_t() const
390  {
391  Lock guard{this};
392  return this->size();
393  }
394  }
395  uniqueCnt;
396 
397  WorkForce wof{setup ([&]{
398  uniqueCnt.mark(std::this_thread::get_id());
399  return activity::PASS;
400  })};
401 
402  CHECK (0 == uniqueCnt);
403  CHECK (0 == wof.size());
404 
405  wof.incScale();
406  sleep_for(1ms);
407  CHECK (1 == uniqueCnt);
408  CHECK (1 == wof.size());
409 
410  wof.incScale();
411  sleep_for(1ms);
412  CHECK (2 == uniqueCnt);
413  CHECK (2 == wof.size());
414 
415 
416  auto fullCnt = work::Config::COMPUTATION_CAPACITY;
417 
418  wof.activate (1.0);
419  sleep_for(5ms);
420  CHECK (fullCnt == uniqueCnt);
421  CHECK (fullCnt == wof.size());
422 
423  wof.activate (2.0);
424  sleep_for(10ms);
425  CHECK (2*fullCnt == uniqueCnt);
426  CHECK (2*fullCnt == wof.size());
427 
428  wof.awaitShutdown();
429  CHECK (0 == wof.size());
430 
431  uniqueCnt.clear();
432  sleep_for(5ms);
433  CHECK (0 == uniqueCnt);
434 
435  wof.activate (0.5);
436  sleep_for(5ms);
437  CHECK (fullCnt/2 == uniqueCnt);
438  CHECK (fullCnt/2 == wof.size());
439  }
440 
441 
442 
445  void
447  {
448  atomic<uint> check{0};
449  WorkForce wof{setup ([&]{
450  ++check;
451  if (check == 5'000 or check == 5'110)
452  return activity::HALT;
453  else
454  return activity::PASS;
455  })};
456 
457  CHECK (0 == wof.size());
458 
459  wof.incScale();
460  wof.incScale();
461  wof.incScale();
462  sleep_for(10us); // this may be fragile; must be sufficiently short
463 
464  CHECK (3 == wof.size());
465 
466  while (check < 6'000)
467  sleep_for(10ms); // .....sufficiently long to count way beyond 10'000
468  CHECK (check > 6'000);
469  CHECK (1 == wof.size());
470  }
471 
472 
473 
481  void
482  verify_dtor_blocks()
483  {
484  atomic<bool> trapped{true};
485  auto blockingWork = [&]{
486  while (trapped)
487  /* spin */;
488  return activity::PASS;
489  };
490 
491  atomic<bool> pool_scaled_up{false};
492  atomic<bool> shutdown_done{false};
493 
494  Thread operate{"controller"
495  ,[&] {
496  {// nested scope...
497  WorkForce wof{setup (blockingWork)};
498 
499  wof.activate();
500  sleep_for (10ms);
501  CHECK (wof.size() == work::Config::COMPUTATION_CAPACITY);
502  pool_scaled_up = true;
503  } // WorkForce goes out of scope => dtor called
504 
505  // when reaching this point, dtor has terminated
506  shutdown_done = true;
507  }};
508 
509  CHECK (operate); // operate-thread is in running state
510  sleep_for(100ms);
511 
512  CHECK (pool_scaled_up);
513  CHECK (not shutdown_done); // all workers are trapped in the work-functor
514  // thus the destructor can't dismantle the pool
515  trapped = false;
516  sleep_for(20ms);
517  CHECK (shutdown_done);
518  CHECK (not operate); // operate-thread has detached and terminated
519  }
520  };
521 
522 
524  LAUNCHER (WorkForce_test, "unit engine");
525 
526 
527 }}} // namespace vault::gear::test
Facility for monitor object based locking.
Definition: sync.hpp:217
Definition: Setup.py:1
Definition: run.hpp:49
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Definition: scheduler.hpp:141
Object Monitor based synchronisation.
Derived specific exceptions within Lumiera&#39;s exception hierarchy.
Definition: error.hpp:199
Abstract Base Class for all testcases.
Definition: run.hpp:62
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Definition: scheduler.hpp:140
Simple test class runner.
Convenience front-end to simplify and codify basic thread handling.
const milliseconds IDLE_WAIT
wait period when a worker falls idle
Definition: work-force.hpp:117
const size_t DISMISS_CYCLES
number of idle cycles after which the worker terminates
Definition: work-force.hpp:118
auto setup(FUN &&workFun)
Helper: setup a Worker-Pool configuration for the test.
A thin convenience wrapper to simplify thread-handling.
Definition: thread.hpp:656
A pool of workers for multithreaded rendering.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:115
Proc
Result instruction from Activity activation.
Definition: activity.hpp:149
Pool of worker threads for rendering.
Definition: work-force.hpp:249
Base for configuration of the worker pool.
Definition: work-force.hpp:113
Vault-Layer implementation namespace root.