Added a smarter base scheduler that actually does a bit of schedule time calculation.

This commit is contained in:
Tessa Nordgren 2019-04-24 14:29:26 -07:00
parent 6482410063
commit ac0cef7242
3 changed files with 78 additions and 5 deletions

View File

@ -1,7 +1,9 @@
package gtq
import (
"fmt"
"sort"
"time"
)
// Scheduler is an internal goroutine which examines the Queues and
@ -18,7 +20,7 @@ func SimpleScheduler(tq *TaskQueue) bool {
for k := range pc {
prios = append(prios, k)
}
sort.Sort(UIntSlice(prios))
sort.Sort(sort.Reverse(UIntSlice(prios)))
var queued uint
for _, prio := range prios {
@ -32,8 +34,69 @@ func SimpleScheduler(tq *TaskQueue) bool {
queued++
}
}
if queued >= (tq.numJobs * 2) {
if queued > 0 {
return true
}
return false
}
// TimeSliceScheduler attempts to give tasks a fair share of the Stats()
// collection window, based on their priority level.
func TimeSliceScheduler(tq *TaskQueue) bool {
pc := tq.PriorityCounts()
// sort priorities
prios := make([]uint, 0, len(pc))
var prioritiesTotal uint
for k := range pc {
prios = append(prios, k)
prioritiesTotal += k
}
sort.Sort(sort.Reverse(UIntSlice(prios)))
stats := tq.Stats()
var window time.Duration
for _, stat := range stats {
window += stat.Duration
}
timeslice := window / time.Duration(prioritiesTotal)
var queued uint
for _, prio := range prios {
// calculate this priority level's fair share, and add outstanding items
// from it if it hasn't exceeded that share.
prioSlice := timeslice * time.Duration(prio)
if stats[prio].Duration < prioSlice {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
}
}
if queued > 0 {
return true
}
// second pass in case none of the priority levels ended up queueing
// anything, queue everything from the highest priority queue with items.
if tq.Length() > 0 {
for _, prio := range prios {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
if queued > 0 {
return true
}
}
}
return false
}

View File

@ -81,7 +81,7 @@ func (tq *TaskQueue) Start(scheduler Scheduler) {
}()
}
if scheduler == nil {
scheduler = SimpleScheduler
scheduler = TimeSliceScheduler
}
go func() {
for tq.running {

View File

@ -40,10 +40,20 @@ func TestTaskQueueAdd(t *testing.T) {
}
}
func BenchmarkTaskQueue(b *testing.B) {
func BenchmarkSimpleScheduler(b *testing.B) {
tq = NewTaskQueue(0)
testTQAdd(b.N / (runtime.GOMAXPROCS(0)))
tq.Start(nil)
tq.Start(SimpleScheduler)
for tq.Length() > 0 {
time.Sleep(10 * time.Millisecond)
}
tq.Stop()
}
func BenchmarkTimeSliceScheduler(b *testing.B) {
tq = NewTaskQueue(0)
testTQAdd(b.N / (runtime.GOMAXPROCS(0)))
tq.Start(TimeSliceScheduler)
for tq.Length() > 0 {
time.Sleep(10 * time.Millisecond)
}