Lumiera  0.pre.03
»edit your freedom«
scheduler-service-test.cpp
1 /*
2  SchedulerService(Test) - component integration test for the scheduler
3 
4  Copyright (C) Lumiera.org
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 * *****************************************************/
22 
28 #include "lib/test/run.hpp"
29 #include "test-chain-load.hpp"
30 #include "activity-detector.hpp"
31 #include "vault/gear/scheduler.hpp"
32 #include "lib/time/timevalue.hpp"
33 #include "lib/format-cout.hpp"
34 #include "lib/format-string.hpp"
35 #include "lib/test/transiently.hpp"
37 #include "lib/util.hpp"
38 
39 #include <thread>
40 
41 using test::Test;
42 
43 
44 namespace vault{
45 namespace gear {
46 namespace test {
47 
48  using util::max;
49  using util::_Fmt;
50  using lib::time::Time;
51  using std::this_thread::sleep_for;
52 
53  namespace {
54  Time t100us = Time{FSecs{1, 10'000}};
55  Time t200us = t100us + t100us;
56  Time t500us = t200us + t200us + t100us;
57  Time t1ms = Time{1,0};
58 
59  const uint TYPICAL_TIME_FOR_ONE_SCHEDULE_us = 3;
60  }
61 
62 
63 
64 
65  /*************************************************************************//**
66  * @test Scheduler component integration test: use the service API for
67  * state control and to add Jobs and watch processing patterns.
68  * @see SchedulerActivity_test
69  * @see SchedulerInvocation_test
70  * @see SchedulerCommutator_test
71  * @see SchedulerLoadControl_test
72  */
73  class SchedulerService_test : public Test
74  {
75 
76  virtual void
77  run (Arg)
78  {
79  simpleUsage();
80  verify_StartStop();
81  verify_LoadFactor();
82  invokeWorkFunction();
83  scheduleRenderJob();
84  processSchedule();
85  }
86 
87 
90  void
91  simpleUsage()
92  {
93  BlockFlowAlloc bFlow;
94  EngineObserver watch;
95  Scheduler scheduler{bFlow, watch};
96  CHECK (scheduler.empty());
97 
98  auto task = onetimeCrunch(4ms);
99  CHECK (1 == task.remainingInvocations());
100 
101  Job job{ task
102  , InvocationInstanceID()
103  , Time::ANYTIME
104  };
105  scheduler.defineSchedule(job)
106  .startOffset(6ms)
107  .lifeWindow(2ms)
108  .post();
109  CHECK (not scheduler.empty());
110 
111  sleep_for (3ms); // not invoked yet
112  CHECK (1 == task.remainingInvocations());
113 
114  sleep_for (20ms);
115  CHECK (0 == task.remainingInvocations());
116  } // task has been invoked
117 
118 
119 
120 
128  static void
129  postNewTask (Scheduler& scheduler, Activity& chain, Time start)
130  {
131  ActivationEvent actEvent{chain, start, start + Time{50,0}}; // add dummy deadline +50ms
132  scheduler.layer2_.postChain (actEvent, scheduler.layer1_);
133  }
134 
135 
136 
139  void
140  verify_StartStop()
141  {
142  BlockFlowAlloc bFlow;
143  EngineObserver watch;
144  Scheduler scheduler{bFlow, watch};
145  CHECK (scheduler.empty());
146 
147  Activity dummy{Activity::FEED};
148  auto postIt = [&] { postNewTask (scheduler, dummy, RealClock::now()+t200us); };
149 
150  scheduler.ignite();
151  CHECK (not scheduler.empty());// repeated »tick« task enlisted....
152 
153  postIt();
154  CHECK (not scheduler.empty());
155 
156  scheduler.terminateProcessing();
157  CHECK (scheduler.empty());
158 
159  postIt();
160  postIt();
161  scheduler.ignite();
162  CHECK (not scheduler.empty());
163  //... and just walk away => scheduler unwinds cleanly from destructor
164  }// Note: BlockFlow and WorkForce unwinding is covered in dedicated tests
165 
166 
167 
183  void
184  verify_LoadFactor()
185  {
186  MARK_TEST_FUN
187  BlockFlowAlloc bFlow;
188  EngineObserver watch;
189  Scheduler scheduler{bFlow, watch};
190  CHECK (scheduler.empty());
191 
192  // use a single FEED as content
193  Activity dummy{Activity::FEED};
194 
195  auto anchor = RealClock::now();
196  auto offset = [&](Time when =RealClock::now()){ return _raw(when) - _raw(anchor); };
197 
198  auto createLoad = [&](Offset start, uint cnt)
199  { // use internal API (this test is declared as friend)
200  for (uint i=0; i<cnt; ++i) // flood the queue
201  postNewTask (scheduler, dummy, anchor + start + TimeValue{i});
202  };
203 
204 
205  auto LOAD_PEAK_DURATION_us = 2000;
206  auto fatPackage = LOAD_PEAK_DURATION_us/TYPICAL_TIME_FOR_ONE_SCHEDULE_us;
207 
208  createLoad (Offset{Time{ 5,0}}, fatPackage);
209  createLoad (Offset{Time{15,0}}, fatPackage);
210 
211  scheduler.ignite();
212  cout << "Timing : start-up required.."<<offset()<<" µs"<<endl;
213 
214  // now watch change of load and look out for two peaks....
215  uint peak1_s =0;
216  uint peak1_dur=0;
217  double peak1_max=0;
218  uint peak2_s =0;
219  uint peak2_dur=0;
220  double peak2_max=0;
221 
222  uint phase=0;
223  _Fmt row{"%6d | Load: %5.3f Head:%5d Lag:%6d\n"};
224 
225  while (not scheduler.empty()) // should fall empty at end
226  {
227  sleep_for(50us);
228  double load = scheduler.getLoadIndicator();
229 
230  switch (phase) {
231  case 0:
232  if (load > 1.0)
233  {
234  ++phase;
235  peak1_s = offset();
236  }
237  break;
238  case 1:
239  peak1_max = max (load, peak1_max);
240  if (load < 1.0)
241  {
242  ++phase;
243  peak1_dur = offset() - peak1_s;
244  }
245  break;
246  case 2:
247  if (load > 1.0)
248  {
249  ++phase;
250  peak2_s = offset();
251  }
252  break;
253  case 3:
254  peak2_max = max (load, peak2_max);
255  if (load < 1.0)
256  {
257  ++phase;
258  peak2_dur = offset() - peak2_s;
259  }
260  break;
261  }
262  cout << row % offset() % load
263  % offset(scheduler.layer1_.headTime())
264  % scheduler.loadControl_.averageLag();
265  }
266  uint done = offset();
267 
268  //--------Summary-Table------------------------------
269  _Fmt peak{"\nPeak %d ....... %5d +%dµs %34tmax=%3.1f"};
270  cout << "-------+-------------+----------+----------"
271  << "\n\n"
272  << peak % 1 % peak1_s % peak1_dur % peak1_max
273  << peak % 2 % peak2_s % peak2_dur % peak2_max
274  << "\nTick ....... "<<done
275  <<endl;
276 
277  CHECK (phase == 4);
278  CHECK (peak1_s > 5000); // first peak was scheduled at 5ms
279  CHECK (peak1_s < 10000);
280  CHECK (peak2_s > 15000); // second peak was scheduled at 15ms
281  CHECK (peak2_s < 20000);
282  CHECK (peak1_max > 2.0);
283  CHECK (peak2_max > 2.0);
284 
285  CHECK (done > 50000); // »Tick« period is 50ms
286  // and this tick should determine end of timeline
287 
288  cout << "\nwaiting for shutdown of WorkForce";
289  while (scheduler.workForce_.size() > 0)
290  {
291  sleep_for(10ms);
292  cout << "." << std::flush;
293  }
294  uint shutdown = offset();
295  cout << "\nShutdown after "<<shutdown / 1.0e6<<"sec"<<endl;
296  CHECK (shutdown > 2.0e6);
297  }
298 
299 
300 
328  void
329  invokeWorkFunction()
330  {
331  MARK_TEST_FUN
332  BlockFlowAlloc bFlow;
333  EngineObserver watch;
334  Scheduler scheduler{bFlow, watch};
335 
336  ActivityDetector detector;
337  Activity& probe = detector.buildActivationProbe ("testProbe");
338 
339  TimeVar start;
340  int64_t delay_us;
341  int64_t slip_us;
342  activity::Proc res;
343 
344  auto post = [&](Time start)
345  { // this test class is declared friend to get a backdoor into Scheduler internals...
346  scheduler.layer2_.acquireGoomingToken();
347  postNewTask (scheduler, probe, start);
348  };
349 
350  auto pullWork = [&] {
351  delay_us = lib::test::benchmarkTime([&]{ res = scheduler.doWork(); });
352  slip_us = _raw(detector.invokeTime(probe)) - _raw(start);
353  cout << "res:"<<res<<" delay="<<delay_us<<"µs slip="<<slip_us<<"µs"<<endl;
354  };
355 
356 
357  auto wasClose = [](TimeValue a, TimeValue b)
358  { // 500µs are considered "close"
359  return Duration{Offset{a,b}} < Duration{FSecs{1,2000}};
360  };
361  auto wasInvoked = [&](Time start)
362  {
363  Time invoked = detector.invokeTime (probe);
364  return invoked >= start
365  and wasClose (invoked, start);
366  };
367 
368 
369  cout << "pullWork() on empty queue..."<<endl;
370  pullWork(); // Call the work-Function on empty Scheduler queue
371  CHECK (activity::WAIT == res); // the result instructs this thread to go to sleep immediately
372 
373 
374  cout << "Due at pullWork()..."<<endl;
375  TimeVar now = RealClock::now();
376  start = now + t100us; // Set a schedule 100ms ahead of "now"
377  post (start);
378  CHECK (not scheduler.empty()); // was enqueued
379  CHECK (not wasInvoked(start)); // ...but not activated yet
380 
381  sleep_for (100us); // wait beyond the planned start point (typically waits ~150µs or more)
382  pullWork();
383  CHECK (wasInvoked(start));
384  CHECK (slip_us < 300); // Note: typically there is a slip of 100..200µs, because sleep waits longer
385  CHECK (scheduler.empty()); // The scheduler is empty now and this thread will go to sleep,
386  CHECK (delay_us < 20200); // however the sleep-cycle is first re-shuffled by a wait between 0 ... 20ms
387  CHECK (activity::PASS == res); // this thread is instructed to check back once
388  pullWork();
389  CHECK (activity::WAIT == res); // ...yet since the queue is still empty, it is sent immediately to sleep
390  CHECK (delay_us < 40);
391 
392 
393  cout << "next some time ahead => up-front delay"<<endl;
394  now = RealClock::now();
395  start = now + t500us; // Set a schedule significantly into the future...
396  post (start);
397  CHECK (not scheduler.empty());
398 
399  pullWork(); // ...and invoke the work-Function immediately "now"
400  CHECK (activity::PASS == res); // Result: this thread was kept in sleep in the work-Function
401  CHECK (not wasInvoked(start)); // but the next dispatch did not happen yet; we are instructed to re-invoke immediately
402  CHECK (delay_us > 500); // this proves that there was a delay to wait for the next schedule
403  CHECK (delay_us < 1000);
404  pullWork(); // if we now re-invoke the work-Function as instructed...
405  CHECK (wasInvoked(start)); // then the next schedule is already slightly overdue and immediately invoked
406  CHECK (scheduler.empty()); // the queue is empty and thus this thread will be sent to sleep
407  CHECK (delay_us < 20200); // but beforehand the sleep-cycle is re-shuffled by a wait between 0 ... 20ms
408  CHECK (slip_us < 300);
409  CHECK (activity::PASS == res); // instruction to check back once
410  pullWork();
411  CHECK (activity::WAIT == res); // but next call will send this thread to sleep right away
412  CHECK (delay_us < 40);
413 
414 
415  cout << "follow-up with some distance => follow-up delay"<<endl;
416  now = RealClock::now();
417  start = now + t100us;
418  post (start); // This time the schedule is set to be "soon"
419  post (start+t1ms); // But another schedule is placed 1ms behind
420  sleep_for (100us); // wait for "soon" to pass...
421  pullWork();
422  CHECK (wasInvoked(start)); // Result: the first invocation happened immediately
423  CHECK (slip_us < 300);
424  CHECK (delay_us > 900); // yet this thread was afterwards kept in sleep to await the next task;
425  CHECK (activity::PASS == res); // returns instruction to re-invoke immediately
426  CHECK (not scheduler.empty()); // since there is still work in the queue
427 
428  start += t1ms; // (just re-adjust the reference point to calculate slip_us)
429  pullWork(); // re-invoke immediately as instructed
430  CHECK (wasInvoked(start)); // Result: also the next Activity has been dispatched
431  CHECK (slip_us < 400); // not much slip
432  CHECK (delay_us < 20200); // ...and the post-delay is used to re-shuffle the sleep cycle as usual
433  CHECK (activity::PASS == res); // since queue is empty, we will call back once...
434  CHECK (scheduler.empty());
435  pullWork();
436  CHECK (activity::WAIT == res); // and then go to sleep.
437 
438 
439  cout << "already tended-next => re-target capacity"<<endl;
440  now = RealClock::now();
441  start = now + t500us; // Set the next schedule with some distance...
442  post (start);
443 
444  // Access scheduler internals (as friend)
445  CHECK (start == scheduler.layer1_.headTime()); // next schedule indeed appears as next-head
446  CHECK (not scheduler.loadControl_.tendedNext(start)); // but this next time was not yet marked as "tended"
447 
448  scheduler.loadControl_.tendNext(start); // manipulate scheduler to mark next-head as "tended"
449  CHECK ( scheduler.loadControl_.tendedNext(start));
450 
451  CHECK (start == scheduler.layer1_.headTime()); // other state still the same
452  CHECK (not scheduler.empty());
453 
454  pullWork();
455  CHECK (not wasInvoked(start)); // since next-head was marked as "tended"...
456  CHECK (not scheduler.empty()); // ...this thread is not used to dispatch it
457  CHECK (delay_us < 6000); // rather it is re-focussed as free capacity within WORK_HORIZON
458  }
459 
460 
461 
462 
474  void
475  scheduleRenderJob()
476  {
477  BlockFlowAlloc bFlow;
478  EngineObserver watch;
479  Scheduler scheduler{bFlow, watch};
480 
481  // prevent scale-up of the Scheuler's WorkForce
483 
484  Time nominal{7,7};
485  Time start{0,1};
486  Time dead{0,10};
487 
488  ActivityDetector detector;
489  Job testJob{detector.buildMockJob("testJob", nominal, 1337)};
490 
491  CHECK (scheduler.empty());
492 
493  // use the public Render-Job builder API
494  scheduler.defineSchedule(testJob)
495  .startOffset(400us)
496  .lifeWindow (2ms)
497  .post();
498 
499  CHECK (not scheduler.empty());
500 
501  // cause the new entry to migrate to the priority queue...
502  scheduler.layer2_.maybeFeed(scheduler.layer1_);
503 
504  // investigate the generated ActivationEvent at queue head
505  auto entry = scheduler.layer1_.peekHead();
506  auto now = RealClock::now();
507 
508  CHECK (entry.activity->is(Activity::POST));
509  CHECK (entry.activity->next->is(Activity::GATE));
510  CHECK (entry.activity->next->next->is(Activity::WORKSTART));
511  CHECK (entry.activity->next->next->next->is(Activity::INVOKE));
512  CHECK (entry.startTime() - now < _uTicks( 400us));
513  CHECK (entry.deathTime() - now < _uTicks(2400us));
514  CHECK (entry.manifestation == 0);
515  CHECK (entry.isCompulsory == false);
516 
517 
518  sleep_for(400us); // wait to be sure the new entry has reached maturity
519  detector.incrementSeq(); // mark this point in the detector-log...
520 
521  // Explicitly invoke the work-Function (normally done by the workers)
522  CHECK (activity::PASS == scheduler.doWork());
523 
524  CHECK (detector.verifySeqIncrement(1)
525  .beforeInvocation("testJob").arg("7.007", 1337));
526 
527 // cout << detector.showLog()<<endl; // HINT: use this for investigation...
528  }
529 
530 
531 
532 
533 
545  void
547  {
549  TestChainLoad<16> testLoad{64};
551  .buildTopology();
552 
553  auto stats = testLoad.computeGraphStatistics();
554  cout << _Fmt{"Test-Load: Nodes: %d Levels: %d ∅Node/Level: %3.1f Forks: %d Joins: %d"}
555  % stats.nodes
556  % stats.levels
557  % stats.indicators[STAT_NODE].pL
558  % stats.indicators[STAT_FORK].cnt
559  % stats.indicators[STAT_JOIN].cnt
560  << endl;
561 
562  // while building the calculation-plan graph
563  // node hashes were computed, observing dependencies
564  size_t expectedHash = testLoad.getHash();
565 
566  // some jobs/nodes are marked with a weight-step
567  // these can be instructed to spend some CPU time
568  auto LOAD_BASE = 500us;
569  testLoad.performGraphSynchronously(LOAD_BASE);
570  CHECK (testLoad.getHash() == expectedHash);
571 
572  double referenceTime = testLoad.calcRuntimeReference(LOAD_BASE);
573  cout << "refTime(singleThr): "<<referenceTime/1000<<"ms"<<endl;
574 
575 
576  // Perform through Scheduler----------
577  BlockFlowAlloc bFlow;
578  EngineObserver watch;
579  Scheduler scheduler{bFlow, watch};
580 
581  double performanceTime =
582  testLoad.setupSchedule(scheduler)
583  .withLoadTimeBase(LOAD_BASE)
584  .withJobDeadline(30ms)
585  .launch_and_wait();
586 
587  cout << "runTime(Scheduler): "<<performanceTime/1000<<"ms"<<endl;
588 
589  // invocation through Scheduler has reproduced all node hashes
590  CHECK (testLoad.getHash() == expectedHash);
591 
592  // due to the massive load burst at end, Scheduler falls behind plan
593  CHECK (performanceTime < 2*referenceTime);
594  // typical values: refTime ≡ ~35ms, runTime ≡ ~45ms
595  }
596  };
597 
598 
600  LAUNCHER (SchedulerService_test, "unit engine");
601 
602 
603 
604 }}} // namespace vault::gear::test
const StatKey STAT_NODE
all nodes
signal start of some processing and transition grooming mode ⟼ *work mode
Definition: activity.hpp:242
Automatically use custom string conversion in C++ stream output.
#define TRANSIENTLY(_OO_)
Macro to simplify capturing assignments.
AnyPair entry(Query< TY > const &query, typename WrapReturn< TY >::Wrapper &obj)
helper to simplify creating mock table entries, wrapped correctly
Definition: run.hpp:49
Front-end for printf-style string template interpolation.
TestChainLoad && buildTopology()
Use current configuration and seed to (re)build Node connectivity.
Generate synthetic computation load for Scheduler performance tests.
Functions to perform (multithreaded) timing measurement on a given functor.
A Generator for synthetic Render Jobs for Scheduler load testing.
A front-end for using printf-style formatting.
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:308
post a message providing a chain of further time-bound Activities
Definition: activity.hpp:246
Abstract Base Class for all testcases.
Definition: run.hpp:62
»Scheduler-Service« : coordinate render activities.
Definition: scheduler.hpp:222
Service for coordination and dispatch of render activities.
Diagnostic context to record and evaluate activations within the Scheduler.
#define MARK_TEST_FUN
Macro to mark the current test function in STDOUT.
Simple test class runner.
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
TestChainLoad && configureShape_chain_loadBursts()
preconfigured topology: single graph with massive »load bursts«
const StatKey STAT_JOIN
joining node
uint incrementSeq()
increment the internal invocation sequence number
probe window + count-down; activate next Activity, else re-schedule
Definition: activity.hpp:245
Diagnostic setup to instrument and observe Activity activations.
Statistic computeGraphStatistics()
Operator on TestChainLoad to evaluate current graph connectivity.
Test helper to perform temporary manipulations within a test scope.
dispatch a JobFunctor into a worker thread
Definition: activity.hpp:241
const StatKey STAT_FORK
forking node
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:115
Individual frame rendering task, forwarding to a closure.
Definition: job.h:277
a family of time value like entities and their relationships.
Pool of worker threads for rendering.
Definition: work-force.hpp:249
ActivityMatch & arg(ARGS const &...args)
qualifier: additionally match the function arguments
Vault-Layer implementation namespace root.
Collector and aggregator for performance data.