gtq/scheduler.go
Tessa Nordgren 5d5af6e926
Some checks failed
continuous-integration/drone/push Build is failing
minor improvements from static code analysis
2022-02-18 12:09:52 -08:00

99 lines
2.2 KiB
Go

package gtq
import (
"sort"
"time"
)
// Scheduler is an internal goroutine which examines the Queues and
// choses which tasks to run next. Returns true if tasks were scheduled,
// or false if there's nothing left to schedule.
type Scheduler func(tq *TaskQueue) bool
// SimpleScheduler is the simplest possible implementation, which just takes
// tasks off the highest priority queue.
func SimpleScheduler(tq *TaskQueue) bool {
pc := tq.PriorityCounts()
// sort priorities
prios := make([]uint, 0, len(pc))
for k := range pc {
prios = append(prios, k)
}
sort.Sort(sort.Reverse(UIntSlice(prios)))
var queued uint
for _, prio := range prios {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
}
return (queued > 0)
}
// TimeSliceScheduler attempts to give tasks a fair share of the Stats()
// collection window, based on their priority level.
func TimeSliceScheduler(tq *TaskQueue) bool {
pc := tq.PriorityCounts()
// sort priorities
prios := make([]uint, 0, len(pc))
var prioritiesTotal uint
for k := range pc {
prios = append(prios, k)
prioritiesTotal += k
}
sort.Sort(sort.Reverse(UIntSlice(prios)))
stats := tq.Stats()
var window time.Duration
for _, stat := range stats {
window += stat.Duration
}
timeslice := window / time.Duration(prioritiesTotal)
var queued uint
for _, prio := range prios {
// calculate this priority level's fair share, and add outstanding items
// from it if it hasn't exceeded that share.
prioSlice := timeslice * time.Duration(prio)
if stats[prio].Duration < prioSlice {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
}
}
if queued > 0 {
return true
}
// second pass in case none of the priority levels ended up queueing
// anything, queue everything from the highest priority queue with items.
if tq.Length() > 0 {
for _, prio := range prios {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
if queued > 0 {
return true
}
}
}
return false
}