Lumiera  0.pre.03
»edit your freedom«
load-controller.hpp
Go to the documentation of this file.
1 /*
2  LOAD-CONTROLLER.hpp - coordinator for scheduler resource usage
3 
4  Copyright (C) Lumiera.org
5  2023, Hermann Vosseler <Ichthyostega@web.de>
6 
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of
10  the License, or (at your option) any later version.
11 
12  This program is distributed in the hope that it will be useful,
13  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  GNU General Public License for more details.
16 
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 
21 */
22 
23 
85 #ifndef SRC_VAULT_GEAR_LOAD_CONTROLLER_H_
86 #define SRC_VAULT_GEAR_LOAD_CONTROLLER_H_
87 
88 
89 #include "lib/error.hpp"
90 //#include "vault/gear/block-flow.hpp"
91 //#include "vault/gear/activity-lang.hpp"
92 //#include "lib/symbol.hpp"
93 #include "lib/nocopy.hpp"
94 #include "lib/util.hpp"
95 #include "lib/format-cout.hpp"
96 
97 //#include <string>
98 #include <cmath>
99 #include <atomic>
100 #include <chrono>
101 #include <utility>
102 #include <functional>
103 
104 
105 namespace vault{
106 namespace gear {
107 
108 // using util::isnil;
109 // using std::string;
110 
111  using util::max;
112  using util::limited;
113  using lib::time::Time;
114  using lib::time::FSecs;
115  using lib::time::TimeVar;
116  using lib::time::TimeValue;
117  using lib::time::Duration;
118  using lib::time::Offset;
119  using std::chrono_literals::operator ""ms;
120  using std::chrono_literals::operator ""us;
121  using std::function;
122  using std::atomic_int64_t;
123  using std::memory_order_relaxed;
124 
125  namespace { // Scheduler default config
126 
127  inline TimeValue
128  _uTicks (std::chrono::microseconds us)
129  {
130  return TimeValue{us.count()};
131  }
132 
133 
134  Duration SLEEP_HORIZON{_uTicks (20ms)};
135  Duration WORK_HORIZON {_uTicks ( 5ms)};
136  Duration NEAR_HORIZON {_uTicks (50us)};
137  Duration STANDARD_LAG {_uTicks(200us)};
138 
139  const double LAG_SAMPLE_DAMPING = 2;
140  }
141 
142 
143 
154  {
155  public:
156  struct Wiring
157  {
158  function<size_t()> maxCapacity {[]{ return 1; }};
159  function<size_t()> currWorkForceSize{[]{ return 0; }};
160  function<void(uint)> stepUpWorkForce{[](uint){/*NOP*/}};
162  };
163 
164  explicit
165  LoadController (Wiring&& wiring)
166  : wiring_{std::move (wiring)}
167  { }
168 
171  { }
172 
173  private:
174  const Wiring wiring_;
175 
176  TimeVar tendedHead_{Time::ANYTIME};
177 
178 
179  atomic_int64_t sampledLag_{0};
180 
189  void
190  markLagSample (Time head, Time now)
191  { // negative when free capacity
192  double lag = _raw(std::clamp<TimeVar> (now - (head.isRegular()? head:now)
193  , -SLEEP_HORIZON
194  , WORK_HORIZON));
195  const double alpha = LAG_SAMPLE_DAMPING / (1 + wiring_.maxCapacity());
196  int64_t average = sampledLag_.load (memory_order_relaxed);
197  int64_t newAverage;
198  do newAverage = std::floor (lag*alpha + (1-alpha)*average);
199  while (not sampledLag_.compare_exchange_weak (average, newAverage, memory_order_relaxed));
200  }
201 
202  public:
211  int64_t
212  averageLag() const
213  {
214  return sampledLag_.load (memory_order_relaxed);
215  }
216 
223  int64_t
224  setCurrentAverageLag (int64_t lag)
225  {
226  return sampledLag_.exchange(lag, memory_order_relaxed);
227  }
228 
236  double
238  {
239  double lag = averageLag();
240  lag -= _raw(STANDARD_LAG);
241  lag /= _raw(WORK_HORIZON);
242  lag *= 10;
243  double lagFactor = lag<0? 1/(1-lag): 1+lag;
244  double loadFactor = wiring_.currWorkForceSize() / double(wiring_.maxCapacity());
245  return loadFactor * lagFactor;
246  }
247 
249  void
251  {
253  //
254  auto lag = averageLag();
255  if (lag > _raw(WORK_HORIZON))
256  wiring_.stepUpWorkForce(+4);
257  else
258  if (averageLag() > 2*_raw(STANDARD_LAG))
259  wiring_.stepUpWorkForce(+1);
260  }
261 
263  void
265  {
267  }
268 
272  void
273  ensureCapacity (Time startHorizon)
274  {
275  if (startHorizon > 2* SLEEP_HORIZON)
276  return;
277  if (averageLag() > 2*_raw(STANDARD_LAG))
278  wiring_.stepUpWorkForce(+1);
279  }
280 
285  bool
286  tendedNext (Time nextHead) const
287  {
288  return not nextHead.isRegular() // note: empty queue reports Time::NEVER
289  or nextHead == tendedHead_;
290  }
291 
299  void
300  tendNext (Time nextHead)
301  {
302  tendedHead_ = nextHead;
303  }
304 
305 
306 
314  };
315 
317  static Capacity
319  {
320  if (off > SLEEP_HORIZON) return IDLEWAIT;
321  if (off > WORK_HORIZON) return WORKTIME;
322  if (off > NEAR_HORIZON) return NEARTIME;
323  if (off > Time::ZERO) return SPINTIME;
324  else return DISPATCH;
325  }
326 
327 
330  Capacity
332  {
333  auto horizon = classifyTimeHorizon (Offset{head - now});
334  return horizon > SPINTIME
335  and not tendedNext(head)? TENDNEXT
336  : horizon==IDLEWAIT ? WORKTIME // re-randomise sleeper cycles
337  : horizon;
338  }
339 
342  Capacity
344  {
345  markLagSample (head,now);
346  return classifyTimeHorizon (Offset{head - now})
347  > NEARTIME ? IDLEWAIT
348  : markOutgoingCapacity(head,now);
349  }
350 
351 
352 
368  Offset
370  {
371  auto scatter = [&](Duration horizon)
372  {
373  gavl_time_t wrap = hash_value(now) % _raw(horizon);
374  ENSURE (0 <= wrap and wrap < _raw(horizon));
375  return TimeValue{wrap};
376  };
377 
378  TimeVar headDistance = max (tendedHead_-now, Time::ZERO);
379 
380  switch (capacity) {
381  case DISPATCH:
382  return Offset::ZERO;
383  case SPINTIME:
384  return Offset::ZERO;
385  case TENDNEXT:
386  return Offset{headDistance};
387  case NEARTIME:
388  return Offset{headDistance + scatter(Offset{limited (NEAR_HORIZON,headDistance,WORK_HORIZON)})};
389  case WORKTIME:
390  case IDLEWAIT:
391  return Offset{headDistance + scatter(SLEEP_HORIZON)};
392  default:
393  NOTREACHED ("uncovered work capacity classification.");
394  }
395  }
396  };
397 
398 
399 
400 }}// namespace vault::gear
401 #endif /*SRC_VAULT_GEAR_LOAD_CONTROLLER_H_*/
static const Time ANYTIME
border condition marker value. ANYTIME <= any time value
Definition: timevalue.hpp:322
bool tendedNext(Time nextHead) const
did we already tend for the indicated next relevant head time?
int64_t setCurrentAverageLag(int64_t lag)
a mutable time value, behaving like a plain number, allowing copy and re-accessing ...
Definition: timevalue.hpp:241
Duration NEAR_HORIZON
what counts as "imminent" (e.g. for spin-waiting)
void markLagSample(Time head, Time now)
Automatically use custom string conversion in C++ stream output.
const double LAG_SAMPLE_DAMPING
smoothing factor for exponential moving average of lag;
Any copy and copy construction prohibited.
Definition: nocopy.hpp:46
Capacity markIncomingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used when returning from idle wait and asking for work ...
Lumiera&#39;s internal time value datatype.
Definition: timevalue.hpp:308
Controller to coordinate resource usage related to the Scheduler.
void ensureCapacity(Time startHorizon)
Hook to check and possibly scale up WorkForce to handle one additional job.
Mix-Ins to allow or prohibit various degrees of copying and cloning.
capacity for active processing required
bool isRegular() const
Definition: timevalue.hpp:780
Offset scatteredDelayTime(Time now, Capacity capacity)
Generate a time offset to relocate currently unused capacity to a time range where it&#39;s likely to be ...
Tiny helper functions and shortcuts to be used everywhere Consider this header to be effectively incl...
Duration STANDARD_LAG
Experience shows that on average scheduling happens with 200µs delay.
boost::rational< int64_t > FSecs
rational representation of fractional seconds
Definition: timevalue.hpp:229
void tendNext(Time nextHead)
Mark the indicated next head time as tended.
Lumiera error handling (C++ interface).
void markWorkerExit()
statistics update on scaling down the WorkForce
Duration SLEEP_HORIZON
schedules beyond that horizon justify going idle
Offset measures a distance in time.
Definition: timevalue.hpp:367
awaiting imminent activities
Duration is the internal Lumiera time metric.
Definition: timevalue.hpp:477
NUM constexpr limited(NB lowerBound, NUM val, NB upperBound)
force a numeric to be within bounds, inclusively
Definition: util.hpp:101
static Capacity classifyTimeHorizon(Offset off)
classification of time horizon for scheduling
Capacity markOutgoingCapacity(Time head, Time now)
decide how this thread&#39;s capacity shall be used after it returned from being actively employed ...
Duration WORK_HORIZON
the scope of activity currently in the works
basic constant internal time value.
Definition: timevalue.hpp:142
Capacity
Allocation of capacity to time horizon of expected work.
Vault-Layer implementation namespace root.
void updateState(Time)
periodic call to build integrated state indicators
typical stable work task rhythm expected