diff --git a/scheduler.go b/scheduler.go index d8d2a58..2e58b1a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1,7 +1,9 @@ package gtq import ( + "fmt" "sort" + "time" ) // Scheduler is an internal goroutine which examines the Queues and @@ -18,7 +20,7 @@ func SimpleScheduler(tq *TaskQueue) bool { for k := range pc { prios = append(prios, k) } - sort.Sort(UIntSlice(prios)) + sort.Sort(sort.Reverse(UIntSlice(prios))) var queued uint for _, prio := range prios { @@ -32,8 +34,69 @@ func SimpleScheduler(tq *TaskQueue) bool { queued++ } } - if queued >= (tq.numJobs * 2) { + if queued > 0 { return true } return false } + +// TimeSliceScheduler attempts to give tasks a fair share of the Stats() +// collection window, based on their priority level. +func TimeSliceScheduler(tq *TaskQueue) bool { + pc := tq.PriorityCounts() + // sort priorities + prios := make([]uint, 0, len(pc)) + var prioritiesTotal uint + for k := range pc { + prios = append(prios, k) + prioritiesTotal += k + } + sort.Sort(sort.Reverse(UIntSlice(prios))) + + stats := tq.Stats() + var window time.Duration + for _, stat := range stats { + window += stat.Duration + } + timeslice := window / time.Duration(prioritiesTotal) + + var queued uint + for _, prio := range prios { + // calculate this priority level's fair share, and add outstanding items + // from it if it hasn't exceeded that share. + prioSlice := timeslice * time.Duration(prio) + if stats[prio].Duration < prioSlice { + q, ok := tq.queues.Load(prio) + if !ok { + continue + } + for q.(*Queue).Length() > 0 { + task := q.(*Queue).Remove() + tq.nextTask <- task + queued++ + } + } + } + if queued > 0 { + return true + } + // second pass in case none of the priority levels ended up queueing + // anything, queue everything from the highest priority queue with items. + if tq.Length() > 0 { + for _, prio := range prios { + q, ok := tq.queues.Load(prio) + if !ok { + continue + } + for q.(*Queue).Length() > 0 { + task := q.(*Queue).Remove() + tq.nextTask <- task + queued++ + } + if queued > 0 { + return true + } + } + } + return false +} diff --git a/taskqueue.go b/taskqueue.go index c7f1a1e..a53cb7c 100644 --- a/taskqueue.go +++ b/taskqueue.go @@ -81,7 +81,7 @@ func (tq *TaskQueue) Start(scheduler Scheduler) { }() } if scheduler == nil { - scheduler = SimpleScheduler + scheduler = TimeSliceScheduler } go func() { for tq.running { diff --git a/taskqueue_test.go b/taskqueue_test.go index dd0df52..1e64de4 100644 --- a/taskqueue_test.go +++ b/taskqueue_test.go @@ -40,10 +40,20 @@ func TestTaskQueueAdd(t *testing.T) { } } -func BenchmarkTaskQueue(b *testing.B) { +func BenchmarkSimpleScheduler(b *testing.B) { tq = NewTaskQueue(0) testTQAdd(b.N / (runtime.GOMAXPROCS(0))) - tq.Start(nil) + tq.Start(SimpleScheduler) + for tq.Length() > 0 { + time.Sleep(10 * time.Millisecond) + } + tq.Stop() +} + +func BenchmarkTimeSliceScheduler(b *testing.B) { + tq = NewTaskQueue(0) + testTQAdd(b.N / (runtime.GOMAXPROCS(0))) + tq.Start(TimeSliceScheduler) for tq.Length() > 0 { time.Sleep(10 * time.Millisecond) }