gtq/scheduler.go

99 lines
2.2 KiB
Go
Raw Permalink Normal View History

2019-04-11 14:21:27 +00:00
package gtq
import (
"sort"
"time"
2019-04-11 14:21:27 +00:00
)
// Scheduler is an internal goroutine which examines the Queues and
// choses which tasks to run next. Returns true if tasks were scheduled,
// or false if there's nothing left to schedule.
type Scheduler func(tq *TaskQueue) bool
// SimpleScheduler is the simplest possible implementation, which just takes
// tasks off the highest priority queue.
func SimpleScheduler(tq *TaskQueue) bool {
pc := tq.PriorityCounts()
// sort priorities
prios := make([]uint, 0, len(pc))
for k := range pc {
prios = append(prios, k)
}
sort.Sort(sort.Reverse(UIntSlice(prios)))
2019-04-11 14:21:27 +00:00
var queued uint
for _, prio := range prios {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
}
return (queued > 0)
2019-04-11 14:21:27 +00:00
}
// TimeSliceScheduler attempts to give tasks a fair share of the Stats()
// collection window, based on their priority level.
func TimeSliceScheduler(tq *TaskQueue) bool {
pc := tq.PriorityCounts()
// sort priorities
prios := make([]uint, 0, len(pc))
var prioritiesTotal uint
for k := range pc {
prios = append(prios, k)
prioritiesTotal += k
}
sort.Sort(sort.Reverse(UIntSlice(prios)))
stats := tq.Stats()
var window time.Duration
for _, stat := range stats {
window += stat.Duration
}
timeslice := window / time.Duration(prioritiesTotal)
var queued uint
for _, prio := range prios {
// calculate this priority level's fair share, and add outstanding items
// from it if it hasn't exceeded that share.
prioSlice := timeslice * time.Duration(prio)
if stats[prio].Duration < prioSlice {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
}
}
if queued > 0 {
return true
}
// second pass in case none of the priority levels ended up queueing
// anything, queue everything from the highest priority queue with items.
if tq.Length() > 0 {
for _, prio := range prios {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
if queued > 0 {
return true
}
}
}
return false
}