Lumiera  0.pre.03
»edit your freedom«
scheduler.hpp
Go to the documentation of this file.
1 /*
2  SCHEDULER.hpp - coordination of render activities under timing and dependency constraints
3 
4  Copyright (C) Lumiera.org
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 */
22 
23 
102 #ifndef SRC_VAULT_GEAR_SCHEDULER_H_
103 #define SRC_VAULT_GEAR_SCHEDULER_H_
104 
105 
106 #include "lib/error.hpp"
107 #include "vault/gear/block-flow.hpp"
108 #include "vault/gear/work-force.hpp"
114 #include "vault/real-clock.hpp"
115 //#include "lib/symbol.hpp"
116 #include "lib/nocopy.hpp"
117 //#include "lib/util.hpp"
118 
119 //#include <string>
120 #include <optional>
121 #include <utility>
122 
123 
124 namespace vault{
125 namespace gear {
126 
127 // using util::isnil;
128 // using std::string;
129  using std::move;
130  using lib::time::Time;
131  using lib::time::FSecs;
132  using lib::time::Offset;
133 
134  namespace test { // declared friend for test access
135  class SchedulerService_test;
136  }
137 
138  namespace { // Scheduler default config
139 
140  const auto IDLE_WAIT = 20ms;
141  const size_t DISMISS_CYCLES = 100;
142  Offset DUTY_CYCLE_PERIOD{FSecs(1,20)};
145  }
146 
147 
148  class Scheduler;
149 
152  {
153  Job job_;
154  TimeVar start_{Time::ANYTIME};
155  TimeVar death_{Time::NEVER};
156  ManifestationID manID_{};
157  bool isCompulsory_{false};
158 
159  Scheduler* theScheduler_;
160  std::optional<activity::Term> term_;
161 
162  public:
163  ScheduleSpec (Scheduler& sched, Job job)
164  : job_{job}
165  , theScheduler_{&sched}
166  , term_{std::nullopt}
167  { }
168 
170  startOffset (microseconds afterNow)
171  {
172  start_ = RealClock::now() + _uTicks(afterNow);
173  return move(*this);
174  }
175 
177  startTime (Time fixedTime)
178  {
179  start_ = fixedTime;
180  return move(*this);
181  }
182 
184  lifeWindow (microseconds afterStart)
185  {
186  death_ = start_ + _uTicks(afterStart);
187  return move(*this);
188  }
189 
191  manifestation (ManifestationID manID)
192  {
193  manID_ = manID;
194  return move(*this);
195  }
196 
198  compulsory (bool indeed =true)
199  {
200  isCompulsory_ = indeed;
201  return move(*this);
202  }
203 
204 
206  ScheduleSpec post();
207 
208  ScheduleSpec linkToSuccessor (ScheduleSpec&, bool unlimitedTime =false);
209  ScheduleSpec linkToPredecessor(ScheduleSpec&, bool unlimitedTime =false);
210  private:
211  void maybeBuildTerm();
212  };
213 
214 
215 
216  /******************************************************/
222  class Scheduler
224  {
227  {
228  Scheduler& scheduler;
229  activity::Proc doWork() { return scheduler.doWork(); }
230  void finalHook (bool _) { scheduler.handleWorkerTermination(_);}
231  };
232 
233 
234  SchedulerInvocation layer1_;
235  SchedulerCommutator layer2_;
236  WorkForce<Setup> workForce_;
237 
238  ActivityLang activityLang_;
239  LoadController loadControl_;
240  EngineObserver& engineObserver_;
241 
242 
243  public:
244  Scheduler (BlockFlowAlloc& activityAllocator
245  ,EngineObserver& engineObserver)
246  : layer1_{}
247  , layer2_{}
248  , workForce_{Setup{IDLE_WAIT, DISMISS_CYCLES, *this}}
249  , activityLang_{activityAllocator}
250  , loadControl_{connectMonitoring()}
251  , engineObserver_{engineObserver}
252  { }
253 
254 
255  bool
256  empty() const
257  {
258  return layer1_.empty();
259  }
260 
267  void
269  {
270  TRACE (engine, "Ignite Scheduler Dispatch.");
271  bool force_continued_run{true};
272  handleDutyCycle (RealClock::now(), force_continued_run);
273  if (not empty())
274  workForce_.activate();
275  }
276 
277 
286  void
288  {
289  TRACE (engine, "Forcibly terminate Scheduler Dispatch.");
290  workForce_.awaitShutdown();
291  layer1_.discardSchedule();
292  }
293 
294 
300  double
302  {
303  return loadControl_.effectiveLoad();
304  }
305 
306 
326  void
327  seedCalcStream (Job planningJob
329  ,FrameRate expectedAdditionalLoad = FrameRate(25))
330  {
331  auto guard = layer2_.requireGroomingTokenHere(); // allow mutation
332  layer1_.activate (manID);
333  activityLang_.announceLoad (expectedAdditionalLoad);
334  continueMetaJob (RealClock::now(), planningJob, manID);
335  }
336 
337 
341  void
343  ,Job planningJob
344  ,ManifestationID manID = ManifestationID())
345  {
346  bool isCompulsory = true;
347  Time deadline = nextStart + DUTY_CYCLE_TOLERANCE;
348  // place the meta-Job into the timeline...
349  postChain ({activityLang_.buildMetaJob(planningJob, nextStart, deadline)
350  .post()
351  , nextStart
352  , deadline
353  , manID
354  , isCompulsory});
355  }
356 
357 
367  {
368  return ScheduleSpec{*this, job};
369  }
370 
371 
372 
377  activity::Proc doWork();
378 
379 
380  private:
381  void postChain (ActivationEvent);
382  void sanityCheck (ActivationEvent const&);
383  void handleDutyCycle (Time now, bool =false);
384  void handleWorkerTermination (bool isFailure);
385  void maybeScaleWorkForce (Time startHorizon);
386 
387  void triggerEmergency();
388 
389 
393  {
395  setup.maxCapacity = []{ return work::Config::COMPUTATION_CAPACITY; };
396  setup.currWorkForceSize = [this]{ return workForce_.size(); };
397  setup.stepUpWorkForce = [this](uint steps){ workForce_.incScale(steps); };
398  return setup;
399  }
400 
402  Time
404  {
405  return RealClock::now();
406  }
407 
409  class ExecutionCtx;
410  friend class ExecutionCtx;
411 
413  friend class ScheduleSpec;
414 
417  };
418 
419 
420 
421 
422 
423 
426  : public EngineEvent
427  {
429  using EngineEvent::EngineEvent;
430 
431  static Symbol WORKSTART;
432  static Symbol WORKSTOP;
433 
434  public:
435  static WorkTiming start (Time now) { return WorkTiming{WORKSTART, Payload{now}}; }
436  static WorkTiming stop (Time now) { return WorkTiming{WORKSTOP, Payload{now}}; }
437  };
438 
449  {
450  Scheduler& scheduler_;
451  public:
452 
453  ActivationEvent rootEvent;
454 
455  ExecutionCtx(Scheduler& self, ActivationEvent toDispatch)
456  : scheduler_{self}
457  , rootEvent{toDispatch}
458  { }
459 
460 
461  /* ==== Implementation of the Concept ExecutionCtx ==== */
462 
472  post (Time when, Time dead, Activity* chain, ExecutionCtx& ctx)
473  {
474  REQUIRE (chain);
475  ActivationEvent chainEvent = ctx.rootEvent;
476  chainEvent.refineTo (chain, when, dead);
477  scheduler_.sanityCheck (chainEvent);
478  return scheduler_.layer2_.postChain (chainEvent, scheduler_.layer1_);
479  }
480 
487  void
488  work (Time now, size_t qualifier)
489  {
490  scheduler_.layer2_.dropGroomingToken();
491  scheduler_.engineObserver_.dispatchEvent(qualifier, WorkTiming::start(now));
492  }
493 
495  void
496  done (Time now, size_t qualifier)
497  {
498  scheduler_.engineObserver_.dispatchEvent(qualifier, WorkTiming::stop(now));
499  }
500 
503  tick (Time now)
504  {
505  scheduler_.handleDutyCycle (now);
506  return activity::PASS;
507  }
508 
510  Time
512  {
513  return scheduler_.getSchedTime();
514  }
515  };
516 
517 
518  inline activity::Proc
520  {
521  return layer2_.dispatchCapacity (layer1_
522  ,loadControl_
523  ,[this](ActivationEvent toDispatch)
524  {
525  ExecutionCtx ctx{*this, toDispatch};
526  return ActivityLang::dispatchChain (toDispatch, ctx);
527  }
528  ,[this]{ return getSchedTime(); }
529  );
530  }
531 
532 
533 
534 
535 
545  inline ScheduleSpec
547  { // protect allocation
548 // auto guard = theScheduler_->layer2_.requireGroomingTokenHere();//////////////////////////////////////TODO can we avoid that?
549  maybeBuildTerm();
550  //set up new schedule by retrieving the Activity-chain...
551  theScheduler_->postChain ({term_->post(), start_
552  , death_
553  , manID_
554  , isCompulsory_});
555  return move(*this);
556  }
557 
562  inline void
564  {
565  if (term_) return;
566  term_ = move(
567  theScheduler_->activityLang_
568  .buildCalculationJob (job_, start_,death_));
569  }
570 
571  inline ScheduleSpec
572  ScheduleSpec::linkToSuccessor (ScheduleSpec& succSpec, bool unlimitedTime)
573  {
574  this->maybeBuildTerm();
575  succSpec.maybeBuildTerm();
576  term_->appendNotificationTo (*succSpec.term_, unlimitedTime);
577  return move(*this);
578  }
579 
580  inline ScheduleSpec
581  ScheduleSpec::linkToPredecessor (ScheduleSpec& predSpec, bool unlimitedTime)
582  {
583  predSpec.maybeBuildTerm();
584  this->maybeBuildTerm();
585  predSpec.term_->appendNotificationTo (*term_, unlimitedTime);
586  return move(*this);
587  }
588 
589 
590 
591  inline void
592  Scheduler::sanityCheck (ActivationEvent const& event)
593  {
594  if (not event)
595  throw error::Logic ("Empty event passed into Scheduler entrance");
596  if (event.startTime() == Time::ANYTIME)
597  throw error::Fatal ("Attempt to schedule an Activity without valid start time");
598  if (event.deathTime() == Time::NEVER)
599  throw error::Fatal ("Attempt to schedule an Activity without valid deadline");
600  Time now{getSchedTime()};
601  Offset toDeadline{now, event.deathTime()};
602  if (toDeadline > FUTURE_PLANNING_LIMIT)
603  throw error::Fatal (util::_Fmt{"Attempt to schedule Activity %s "
604  "with a deadline by %s into the future"}
605  % *event.activity
606  % toDeadline);
607  }
608 
609 
616  inline void
618  {
619  sanityCheck (actEvent);
620  maybeScaleWorkForce (actEvent.startTime());
621  layer2_.postChain (actEvent, layer1_);
622  }
623 
624 
625 
626 
643  inline void
644  Scheduler::handleDutyCycle (Time now, bool forceContinuation)
645  {
646  auto guard = layer2_.requireGroomingTokenHere();
647 
648  // consolidate queue content
649  layer1_.feedPrioritisation();
650  // clean-up of outdated tasks here
651  while (layer1_.isOutdated (now) and not layer1_.isOutOfTime(now))
652  layer1_.pullHead();
653  // protect against missing the deadline of a compulsory task
654  if (layer1_.isOutOfTime (now))
655  {
656  triggerEmergency();
657  return; // leave everything as-is
658  }
659 
660  // clean-up of obsolete Activities
661  activityLang_.discardBefore (now);
662 
663  loadControl_.updateState (now);
664 
665  if (not empty() or forceContinuation)
666  {// prepare next duty cycle »tick«
667  Time nextTick = now + (forceContinuation? WORK_HORIZON : DUTY_CYCLE_PERIOD);
668  Time deadline = nextTick + DUTY_CYCLE_TOLERANCE;
669  Activity& tickActivity = activityLang_.createTick (deadline);
670  ActivationEvent tickEvent{tickActivity, nextTick, deadline, ManifestationID(), true};
671  layer2_.postChain (tickEvent, layer1_);
672  } // *deliberately* use low-level entrance
673  } // to avoid ignite() cycles and derailed load-regulation
674 
679  inline void
681  {
682  if (isFailure)
683  triggerEmergency();
684  else
685  loadControl_.markWorkerExit();
686  }
687 
696  inline void
698  {
699  if (empty())
700  ignite();
701  else
702  loadControl_.ensureCapacity (startHorizon);
703  }
704 
714  inline void
716  {
717  UNIMPLEMENTED ("scheduler overrun -- trigger Emergency");
718  }
719 
720 
721 
722 }} // namespace vault::gear
723 #endif /*SRC_VAULT_GEAR_SCHEDULER_H_*/
static const Time ANYTIME
border condition marker value. ANYTIME <= any time value
Definition: timevalue.hpp:322
a mutable time value, behaving like a plain number, allowing copy and re-accessing ...
Definition: timevalue.hpp:241
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
Scheduler resource usage coordination.
Record to describe an Activity, to happen within the Scheduler&#39;s control flow.
Definition: activity.hpp:235
Types marked with this mix-in may be moved and move-assigned.
Definition: nocopy.hpp:72
void seedCalcStream(Job planningJob, ManifestationID manID=ManifestationID(), FrameRate expectedAdditionalLoad=FrameRate(25))
Set the Scheduler to work on a new CalcStream.
Definition: scheduler.hpp:327
Low-level Render Engine event — abstracted storage base.
void handleWorkerTermination(bool isFailure)
Callback invoked whenever a worker-thread is about to exit.
Definition: scheduler.hpp:680
void discardSchedule()
forcibly clear out the schedule
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
Definition: Setup.py:1
Memory management scheme for activities and parameter data passed through the Scheduler within the Lu...
Offset DUTY_CYCLE_PERIOD
period of the regular scheduler »tick« for state maintenance.
Definition: scheduler.hpp:142
Definition: run.hpp:49
Framerate specified as frames per second.
Definition: timevalue.hpp:664
activity::Proc tick(Time now)
λ-tick : scheduler management duty cycle.
Definition: scheduler.hpp:503
const size_t DISMISS_CYCLES
number of wait cycles before an idle worker terminates completely
Definition: scheduler.hpp:141
Any copy and copy construction prohibited.
Definition: nocopy.hpp:46
void announceLoad(FrameRate fps)
Scheduler Layer-2 : execution of Scheduler Activities.
Offset FUTURE_PLANNING_LIMIT
limit timespan of deadline into the future (~360 MiB max)
Definition: scheduler.hpp:144
Render Engine performance data collection service.
void postChain(ActivationEvent)
Enqueue for time-bound execution, possibly dispatch immediately.
Definition: scheduler.hpp:617
ScheduleSpec post()
build Activity chain and hand-over to the Scheduler.
Definition: scheduler.hpp:546
void ignite()
Spark the engine self-regulation cycle and power up WorkForce.
Definition: scheduler.hpp:268
Time getSchedTime()
access high-resolution-clock, rounded to µ-Ticks
Definition: scheduler.hpp:403
A front-end for using printf-style formatting.
LoadController::Wiring connectMonitoring()
Definition: scheduler.hpp:392
ScheduleSpec defineSchedule(Job job)
Render Job builder: start definition of a schedule to invoke the given Job.
Definition: scheduler.hpp:366
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:308
Offset DUTY_CYCLE_TOLERANCE
maximum slip tolerated on duty-cycle start before triggering Scheduler-emergency
Definition: scheduler.hpp:143
activity::Proc post(Time when, Time dead, Activity *chain, ExecutionCtx &ctx)
λ-post: enqueue for time-bound execution, within given ExecutionCtx.
Definition: scheduler.hpp:472
void done(Time now, size_t qualifier)
λ-done : signal end time of actual processing.
Definition: scheduler.hpp:496
Controller to coordinate resource usage related to the Scheduler.
void activate(ManifestationID manID)
Enable entries marked with a specific ManifestationID to be processed.
Derived specific exceptions within Lumiera&#39;s exception hierarchy.
Definition: error.hpp:199
Term builder and execution framework to perform chains of scheduler Activities.
»Scheduler-Service« : coordinate render activities.
Definition: scheduler.hpp:222
Layer-1 of the Scheduler: queueing and prioritisation of activities.
Token or Atom with distinct identity.
Definition: symbol.hpp:126
void maybeScaleWorkForce(Time startHorizon)
Hook invoked whenever a new task is passed in.
Definition: scheduler.hpp:697
void ensureCapacity(Time startHorizon)
Hook to check and possibly scale up WorkForce to handle one additional job.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
bool isOutdated(Time now) const
determine if Activity at scheduler is outdated and should be discarded
Marker for current (and obsolete) manifestations of a CalcStream processed by the Render-Engine...
Definition: activity.hpp:93
const auto IDLE_WAIT
sleep-recheck cycle for workers deemed idle
Definition: scheduler.hpp:140
work-timing event for performance observation
Definition: scheduler.hpp:425
A language framework to define and interconnect scheduler activity verbs.
void handleDutyCycle(Time now, bool=false)
»Tick-hook« : code to maintain sane running status.
Definition: scheduler.hpp:644
void triggerEmergency()
Trip the emergency brake and unwind processing while retaining all state.
Definition: scheduler.hpp:715
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:229
Layer-2 of the Scheduler: coordination and interaction of activities.
void continueMetaJob(Time nextStart, Job planningJob, ManifestationID manID=ManifestationID())
Place a follow-up job-planning job into the timeline.
Definition: scheduler.hpp:342
activity::Proc doWork()
The worker-Functor: called by the active Workers from the WorkForce to pull / perform the actual rend...
Definition: scheduler.hpp:519
Lumiera error handling (C++ interface).
void markWorkerExit()
statistics update on scaling down the WorkForce
void activate(double degree=1.0)
Activate or scale up the worker pool.
Definition: work-force.hpp:280
static const Time NEVER
border condition marker value. NEVER >= any time value
Definition: timevalue.hpp:323
bool isOutOfTime(Time now) const
detect a compulsory Activity at scheduler head with missed deadline
Offset measures a distance in time.
Definition: timevalue.hpp:367
auto setup(FUN &&workFun)
Helper: setup a Worker-Pool configuration for the test.
Time getSchedTime()
access high-resolution-clock, rounded to µ-Ticks
Definition: scheduler.hpp:511
void discardBefore(Time deadline)
ScopedGroomingGuard requireGroomingTokenHere()
a scope guard to force acquisition of the GroomingToken
static activity::Proc dispatchChain(Activity *chain, EXE &executionCtx)
Execution Framework: dispatch performance of a chain of Activities.
A pool of workers for multithreaded rendering.
static size_t COMPUTATION_CAPACITY
Nominal »full size« of a pool of concurrent workers.
Definition: work-force.hpp:115
Proc
Result instruction from Activity activation.
Definition: activity.hpp:149
ActivationEvent pullHead()
Retrieve from the scheduling queue the entry with earliest start time.
Activity & createTick(Time deadline)
Individual frame rendering task, forwarding to a closure.
Definition: job.h:277
Duration WORK_HORIZON
the scope of activity currently in the works
void feedPrioritisation()
Pick up all new events from the entrance queue and enqueue them to be retrieved ordered by start time...
Front-end for simplified access to the current wall clock time.
Pool of worker threads for rendering.
Definition: work-force.hpp:249
void terminateProcessing()
Bring down processing destructively as fast as possible.
Definition: scheduler.hpp:287
Base for configuration of the worker pool.
Definition: work-force.hpp:113
Vault-Layer implementation namespace root.
activity::Term buildMetaJob(Job job, Time start, Time deadline)
Builder-API: initiate definition of internal/planning job.
Collector and aggregator for performance data.
void updateState(Time)
periodic call to build integrated state indicators
Scheduler Layer-1 : time based dispatch.
void work(Time now, size_t qualifier)
λ-work : transition Managment-Mode -> Work-Mode.
Definition: scheduler.hpp:488
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor: