Lumiera  0.pre.03
»edit your freedom«
scheduler-commutator.hpp
Go to the documentation of this file.
1 /*
2  SCHEDULER-COMMUTATOR.hpp - coordination layer of the render engine scheduler
3 
4  Copyright (C) Lumiera.org
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 */
22 
23 
70 #ifndef SRC_VAULT_GEAR_SCHEDULER_COMMUTATOR_H_
71 #define SRC_VAULT_GEAR_SCHEDULER_COMMUTATOR_H_
72 
73 
74 #include "vault/common.hpp"
75 #include "vault/gear/activity.hpp"
79 #include "lib/time/timevalue.hpp"
80 #include "lib/format-string.hpp"
81 #include "lib/nocopy.hpp"
82 
83 #include <thread>
84 #include <atomic>
85 
86 
87 namespace vault{
88 namespace gear {
89 
90  using lib::time::Offset;
91  using lib::time::FSecs;
92  using lib::time::Time;
93  using std::atomic;
94  using std::memory_order::memory_order_relaxed;
95  using std::memory_order::memory_order_acquire;
96  using std::memory_order::memory_order_release;
97  using std::chrono_literals::operator ""us;
98  using std::chrono::microseconds;
99 
100  namespace { // Configuration / Scheduling limit
101 
102  microseconds GROOMING_WAIT_CYCLE{70us};
103 
105  auto inline thisThread() { return std::this_thread::get_id(); }
106  }
107 
108 
109 
110  /*************************************************************/
120  {
121  using ThreadID = std::thread::id;
122  atomic<ThreadID> groomingToken_{};
123 
124 
125  public:
126  SchedulerCommutator() = default;
127 
136  bool
138  {
139  ThreadID expect_noThread; // expect no one else to be in...
140  return groomingToken_.compare_exchange_strong (expect_noThread, thisThread()
141  ,memory_order_acquire // success also constitutes an acquire barrier
142  ,memory_order_relaxed // failure has no synchronisation ramifications
143  );
144  }
145 
152  void
153  dropGroomingToken() noexcept
154  { // expect that this thread actually holds the Grooming-Token
155  REQUIRE (groomingToken_.load(memory_order_relaxed) == thisThread());
156  const ThreadID noThreadHoldsIt;
157  groomingToken_.store (noThreadHoldsIt, memory_order_release);
158  }
159 
164  bool
165  holdsGroomingToken (ThreadID id) noexcept
166  {
167  return id == groomingToken_.load (memory_order_relaxed);
168  }
169 
170 
171  class ScopedGroomingGuard;
174 
175 
176 
178  void
180  {
181  if (layer1.hasPendingInput()
183  or acquireGoomingToken()))
184  layer1.feedPrioritisation();
185  }
186 
194  {
196  or acquireGoomingToken())
197  {
198  layer1.feedPrioritisation();
199  while (layer1.isOutdated (now) and not layer1.isOutOfTime(now))
200  layer1.pullHead();
201  if (layer1.isDue (now))
202  {
203  if (layer1.isOutOfTime(now))
204  UNIMPLEMENTED ("how to trigger a Scheduler-Emergency from here");
205  else
206  return layer1.pullHead();
207  } }
208  return ActivationEvent();
209  }
210 
211 
212  /***********************************************************/
228  {
230  layer1.feedPrioritisation (move (event));
231  else
232  layer1.instruct (move (event));
233  return activity::PASS;
234  }
235 
236 
242  template<class DISPATCH, class CLOCK>
245 
246 
247 
248  private:
250  scatteredDelay (Time now, Time head
251  ,LoadController& loadController
252  ,LoadController::Capacity capacity);
253 
254  void
255  ensureDroppedGroomingToken()
256  {
259  }
260 
266  {
267  activity::Proc lastResult = activity::PASS;
268 
272  operator activity::Proc()
273  {
274  return activity::SKIP == lastResult? activity::PASS
275  : lastResult;
276  }
277 
278  template<class FUN>
280  performStep (FUN step)
281  {
282  if (activity::PASS == lastResult)
283  lastResult = step();
284  return move(*this);
285  }
286  };
287  };
288 
289 
290 
291 
292 
293 
312  template<class DISPATCH, class CLOCK>
313  inline activity::Proc
315  ,LoadController& loadController
316  ,DISPATCH executeActivity
317  ,CLOCK getSchedTime
318  )
319  {
320  try {
321  auto res = WorkerInstruction{}
322  .performStep([&]{
323  maybeFeed(layer1);
324  Time now = getSchedTime();
325  Time head = layer1.headTime();
326  return scatteredDelay(now, head, loadController,
327  loadController.markIncomingCapacity (head,now));
328  })
329  .performStep([&]{
330  Time now = getSchedTime();
331  auto toDispatch = findWork (layer1,now);
332  if (not toDispatch) return activity::KICK; // contention
333  return executeActivity (toDispatch);
334  })
335  .performStep([&]{
336  maybeFeed(layer1);
337  Time now = getSchedTime();
338  Time head = layer1.headTime();
339  return scatteredDelay(now, head, loadController,
340  loadController.markOutgoingCapacity (head,now));
341  });
342 
343  // ensure lock clean-up
344  if (res != activity::PASS)
345  ensureDroppedGroomingToken();
346  return res;
347  }
348  catch(...)
349  {
350  ensureDroppedGroomingToken();
351  throw;
352  }
353  }
354 
355 
370  inline activity::Proc
372  ,LoadController& loadController
373  ,LoadController::Capacity capacity)
374  {
375  auto doTargetedSleep = [&]
376  { // ensure not to block the Scheduler after management work
377  ensureDroppedGroomingToken();
378  // relocate this thread(capacity) to a time where its more useful
379  Offset targetedDelay = loadController.scatteredDelayTime (now, capacity);
380  std::this_thread::sleep_for (std::chrono::microseconds (_raw(targetedDelay)));
381  };
382  auto doTendNextHead = [&]
383  {
384  if (not loadController.tendedNext(head)
386  or acquireGoomingToken()))
387  loadController.tendNext(head);
388  };
389 
390  switch (capacity) {
392  return activity::PASS;
394  std::this_thread::yield();
395  return activity::SKIP; // prompts to abort chain but call again immediately
397  return activity::WAIT; // prompts to switch this thread into sleep mode
399  doTendNextHead();
400  doTargetedSleep(); // let this thread wait until next head time is due
401  return activity::SKIP;
402  default:
403  doTargetedSleep();
404  return activity::SKIP; // prompts to abort this processing-chain for good
405  }
406  }
407 
408 
409 
410 
411 
414  {
415  SchedulerCommutator& commutator_;
416  bool handledActively_;
417 
418  bool
419  ensureHoldsToken()
420  {
421  if (commutator_.holdsGroomingToken(thisThread()))
422  return false;
423  while (not commutator_.acquireGoomingToken())
424  std::this_thread::sleep_for (GROOMING_WAIT_CYCLE);
425  return true;
426  }
427 
428  public:
431  : commutator_(layer2)
432  , handledActively_{ensureHoldsToken()}
433  { }
434 
436  {
437  if (handledActively_ and
438  commutator_.holdsGroomingToken(thisThread()))
439  commutator_.dropGroomingToken();
440  }
441  };
442 
443 
459  {
460  return ScopedGroomingGuard(*this);
461  }
462 
463 
464 
465 }} // namespace vault::gear
466 #endif /*SRC_VAULT_GEAR_SCHEDULER_COMMUTATOR_H_*/
bool tendedNext(Time nextHead) const
did we already tend for the indicated next relevant head time?
activity::Proc postChain(ActivationEvent event, SchedulerInvocation &layer1)
This is the primary entrance point to the Scheduler.
Scheduler resource usage coordination.
activity::Proc scatteredDelay(Time now, Time head, LoadController &loadController, LoadController::Capacity capacity)
A worker asking for work constitutes free capacity, which can be redirected into a focused zone of th...
bool holdsGroomingToken(ThreadID id) noexcept
check if the indicated thread currently holds the right to conduct internal state transitions...
microseconds GROOMING_WAIT_CYCLE
wait-sleep in case a thread must forcibly acquire the Grooming-Token
void dropGroomingToken() noexcept
relinquish the right for internal state transitions.
Any copy and copy construction prohibited.
Definition: nocopy.hpp:46
Scheduler Layer-2 : execution of Scheduler Activities.
auto thisThread()
convenient short-notation, also used by SchedulerService
Types marked with this mix-in may be moved but not copied.
Definition: nocopy.hpp:58
Front-end for printf-style string template interpolation.
Capacity markIncomingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used when returning from idle wait and asking for work ...
ActivationEvent findWork(SchedulerInvocation &layer1, Time now)
Look into the queues and possibly retrieve work due by now.
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:308
Controller to coordinate resource usage related to the Scheduler.
void instruct(ActivationEvent actEvent)
Accept an ActivationEvent with an Activity for time-bound execution.
Layer-1 of the Scheduler: queueing and prioritisation of activities.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
bool isOutdated(Time now) const
determine if Activity at scheduler is outdated and should be discarded
Offset scatteredDelayTime(Time now, Capacity capacity)
Generate a time offset to relocate currently unused capacity to a time range where it&#39;s likely to be ...
A language framework to define and interconnect scheduler activity verbs.
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:229
void tendNext(Time nextHead)
Mark the indicated next head time as tended.
monad-like step sequence: perform sequence of steps, as long as the result remains activity::PASS ...
Basic set of definitions and includes commonly used together (Vault).
bool isOutOfTime(Time now) const
detect a compulsory Activity at scheduler head with missed deadline
Offset measures a distance in time.
Definition: timevalue.hpp:367
awaiting imminent activities
ScopedGroomingGuard requireGroomingTokenHere()
a scope guard to force acquisition of the GroomingToken
Proc
Result instruction from Activity activation.
Definition: activity.hpp:149
ActivationEvent pullHead()
Retrieve from the scheduling queue the entry with earliest start time.
Capacity markOutgoingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used after it returned from being actively employed ...
a family of time value like entities and their relationships.
void feedPrioritisation()
Pick up all new events from the entrance queue and enqueue them to be retrieved ordered by start time...
Capacity
Allocation of capacity to time horizon of expected work.
void maybeFeed(SchedulerInvocation &layer1)
tend to the input queue if possible
Vault-Layer implementation namespace root.
bool isDue(Time now) const
Determine if there is work to do right now.
bool acquireGoomingToken() noexcept
acquire the right to perform internal state transitions.
Scheduler Layer-1 : time based dispatch.
activity::Proc dispatchCapacity(SchedulerInvocation &, LoadController &, DISPATCH, CLOCK)
Implementation of the worker-Functor:
Descriptor for a piece of operational logic performed by the scheduler.