commit 6482410063806e57d1ff0e58462162f17fa83e7a Author: Tessa Nordgren Date: Thu Apr 11 16:21:27 2019 +0200 initial working release diff --git a/README.md b/README.md new file mode 100644 index 0000000..65e4629 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# gtq - the go task queue + +## what? +a simple goroutine scheduler for longer-running goroutines, allowing you +to put them into priority buckets, and get roughly fair queuing of which +tasks will be processed next. + +## why? +goroutines are awesome, and typically super fast. but what if you need to +queue up a lot of longer running work in the background? and what if you +want some control over which chunks of work will get executed next? + +gtq provides simple task queuing and scheduling for longer running goroutines. +in this case, "long" means on the order of 1ms or longer, as many simple +goroutines can run in a matter of ns. + +## how? +We use simple heap based queues for each priority level, and track statistics +in a rolling window for task execution times in each of those. Then we +have a scheduler which looks at those statistics, in relation to the priority +level for the queue, and decides which queue to pull from next and feed to task +runners. + +Overhead for this process is typically < 1000ns, so it's not suitable for +typical short lived goroutines, but those type of goroutines don't really, +need scheduling anyway. + +## usage +TODO diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8a5f97d --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/nergdron/gtq + +go 1.12 + +require ( + gopkg.in/eapache/queue.v1 v1.1.0 + gopkg.in/yaml.v2 v2.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ce9e261 --- /dev/null +++ b/go.sum @@ -0,0 +1,5 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/eapache/queue.v1 v1.1.0 h1:EldqoJEGtXYiVCMRo2C9mePO2UUGnYn2+qLmlQSqPdc= +gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..d2d944d --- /dev/null +++ b/queue.go @@ -0,0 +1,71 @@ +package gtq + +import ( + "gopkg.in/eapache/queue.v1" + "sync" +) + +// NewQueue initializes and returns an new Queue. +func NewQueue() (q *Queue) { + return &Queue{ + queue: queue.New(), + Done: NewStat(0), + } +} + +// Queue wraps eapache/queue with locking for thread safety. Also bundles in +// Stats, used by schedulers to help pick tasks to execute. +type Queue struct { + queue *queue.Queue + mutex sync.Mutex + // Stat which tracks task removal rate from the queue. + Done *Stat +} + +// Add puts an element on the end of the queue. +func (q *Queue) Add(task func()) { + q.mutex.Lock() + defer q.mutex.Unlock() + q.queue.Add(task) +} + +// Get returns the element at index i in the queue. +// This method accepts both positive and negative index values. +// Index 0 refers to the first element, and index -1 refers to the last. +func (q *Queue) Get(i int) (task func()) { + q.mutex.Lock() + defer q.mutex.Unlock() + if (q.queue.Length() < 1 || i > q.queue.Length()-1) || (i < -1) { + return nil + } + return q.queue.Get(i).(func()) +} + +// Length returns the number of elements currently stored in the queue. +func (q *Queue) Length() int { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.queue.Length() +} + +// Peek returns the element at the head of the queue. +func (q *Queue) Peek() (task func()) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.Length() < 1 { + return nil + } + return q.queue.Peek().(func()) +} + +// Remove removes and returns the element from the front of the queue. +func (q *Queue) Remove() (task func()) { + q.mutex.Lock() + defer q.mutex.Unlock() + if q.queue.Length() < 1 { + return nil + } + task = q.queue.Remove().(func()) + q.Done.Add(1) + return task +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..63dc120 --- /dev/null +++ b/queue_test.go @@ -0,0 +1,87 @@ +package gtq + +import ( + "runtime" + "sync" + "testing" + "time" +) + +var testQ = NewQueue() + +func testAdd(n int) { + for i := 0; i < n; i++ { + testQ.Add(func() { + time.Sleep(1) + }) + } +} + +func testRemove(wg *sync.WaitGroup, removed chan int) { + defer wg.Done() + var i int + for testQ.Remove() != nil { + i++ + } + removed <- i +} + +func TestParallelQueueAdd(t *testing.T) { + max := runtime.GOMAXPROCS(0) + var wg sync.WaitGroup + for n := 0; n < max; n++ { + wg.Add(1) + go func() { + defer wg.Done() + testAdd(1000000) + }() + } + wg.Wait() + t.Log("Queue size is", testQ.Length()) + targetSize := max * 1000000 + if testQ.Length() != targetSize { + t.Error("Queue size should be", targetSize) + } +} + +func TestParallelQueueRemove(t *testing.T) { + target := testQ.Length() + var wg sync.WaitGroup + max := runtime.GOMAXPROCS(0) + removed := make(chan int, max) + for n := 0; n < max; n++ { + wg.Add(1) + go testRemove(&wg, removed) + } + wg.Wait() + close(removed) + var total int + for count := range removed { + total += count + } + if total != target { + t.Error("removed", total, "but expected", target) + } + t.Log("removed", total) +} + +func BenchmarkParallelQueue(b *testing.B) { + testQ = NewQueue() + b.ResetTimer() + max := runtime.GOMAXPROCS(0) + var wg1, wg2 sync.WaitGroup + removed := make(chan int, max) + for i := 0; i < max; i++ { + wg1.Add(1) + wg2.Add(1) + go func() { + testAdd(b.N / (max * 2)) + wg1.Done() + }() + go func() { + testRemove(&wg2, removed) + }() + } + wg1.Wait() + wg2.Wait() +} diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..d8d2a58 --- /dev/null +++ b/scheduler.go @@ -0,0 +1,39 @@ +package gtq + +import ( + "sort" +) + +// Scheduler is an internal goroutine which examines the Queues and +// choses which tasks to run next. Returns true if tasks were scheduled, +// or false if there's nothing left to schedule. +type Scheduler func(tq *TaskQueue) bool + +// SimpleScheduler is the simplest possible implementation, which just takes +// tasks off the highest priority queue. +func SimpleScheduler(tq *TaskQueue) bool { + pc := tq.PriorityCounts() + // sort priorities + prios := make([]uint, 0, len(pc)) + for k := range pc { + prios = append(prios, k) + } + sort.Sort(UIntSlice(prios)) + + var queued uint + for _, prio := range prios { + q, ok := tq.queues.Load(prio) + if !ok { + continue + } + for q.(*Queue).Length() > 0 { + task := q.(*Queue).Remove() + tq.nextTask <- task + queued++ + } + } + if queued >= (tq.numJobs * 2) { + return true + } + return false +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..988bd0a --- /dev/null +++ b/stats.go @@ -0,0 +1,118 @@ +package gtq + +import ( + "gopkg.in/yaml.v2" + "sync" + "time" +) + +// Counter is an incrementing statistical value, with a Start time for rate +// calculation. +type Counter struct { + Start time.Time + Value uint64 +} + +// Stat for a queue priority level, grouped into timeslice buckets. +type Stat struct { + timeSlices []Counter + currentSlice int + window time.Duration + mutex sync.Mutex +} + +// NewStat initializes and returns a new Stat. window determines +// the size of the rolling statistics timeframe that is captured, +// which in turn determines how finely grained stats collection and task +// scheduling can be. window defaults to 1 second. +func NewStat(window time.Duration) (s *Stat) { + s = &Stat{window: window} + if s.window == 0 { + s.window = time.Second + } + // default to 10 timeslice buckets, maybe make configurable later. + s.timeSlices = make([]Counter, 10) + return s +} + +// Add increments the specified counter by the amount in val. window specifies +func (s *Stat) Add(val uint64) { + s.mutex.Lock() + defer s.mutex.Unlock() + // roll over to next timeslice if slice duration has been exceeded. + sliceLength := s.window / time.Duration(len(s.timeSlices)) + now := time.Now() + elapsed := now.Sub(s.timeSlices[s.currentSlice].Start) + if elapsed > sliceLength { + s.currentSlice++ + s.currentSlice = s.currentSlice % len(s.timeSlices) + s.timeSlices[s.currentSlice].Start = now + s.timeSlices[s.currentSlice].Value = val + } + s.timeSlices[s.currentSlice].Value += val +} + +// Total returns the current rolling total for this Stat. +func (s *Stat) Total() (t uint64) { + for _, slice := range s.timeSlices { + t += slice.Value + } + return t +} + +// Start returns the start time for this statistics window. +func (s *Stat) Start() (start time.Time) { + for i := s.currentSlice; i < 10; i++ { + if s.timeSlices[i%10].Start.After(time.Time{}) { + return s.timeSlices[i%10].Start + } + } + return start +} + +// Duration returns the size of the current statistics window in this Stat. +// This will fluctuate between 90-100% of the window size set when the state +// was created, as internal buckets roll over. +func (s *Stat) Duration() time.Duration { + return time.Now().Sub(s.Start()) +} + +// Rate returns the flow rate of this counter, which is the current rolling +// value / the current statistics duration. +func (s *Stat) Rate() (r float64) { + return float64(s.Total()) / (float64(s.Duration()) / float64(time.Second)) +} + +// QueueStats is a report of formatted Stat data about a specific priority +// queue captured at a point in time. +type QueueStats struct { + Start time.Time + time.Duration + Count uint64 + Rate float64 +} + +func (qs *QueueStats) String() string { + b, _ := yaml.Marshal(qs) + return string(b) +} + +// Stats returns the current time usage statistics for all active task +// priority levels. +func (tq *TaskQueue) Stats() (stats map[uint]*QueueStats) { + stats = make(map[uint]*QueueStats) + tq.mutex.Lock() + defer tq.mutex.Unlock() + tq.queues.Range(func(prio, q interface{}) bool { + stat := q.(*Queue).Done + qs := &QueueStats{ + Start: stat.Start(), + Duration: stat.Duration(), + Count: stat.Total(), + Rate: stat.Rate(), + } + stats[prio.(uint)] = qs + return true + }) + return stats +} diff --git a/stats_test.go b/stats_test.go new file mode 100644 index 0000000..94869af --- /dev/null +++ b/stats_test.go @@ -0,0 +1,22 @@ +package gtq + +import ( + "math/rand" + "testing" + "time" +) + +var testStat = NewStat(0) + +func TestStat(t *testing.T) { + total := int64(10000) + perOp := int64(time.Second) / total + for i := int64(0); i < total; i++ { + testStat.Add(1) + // random sleep to introduce some flow rate variation to capture. + time.Sleep(time.Duration(rand.Float64() * float64(perOp))) + } + t.Log("completed", testStat.Total(), "ops in", testStat.Duration(), + "for a total rate of", int64(testStat.Rate()), "ops/sec", + ) +} diff --git a/taskqueue.go b/taskqueue.go new file mode 100644 index 0000000..c7f1a1e --- /dev/null +++ b/taskqueue.go @@ -0,0 +1,101 @@ +package gtq + +import ( + "runtime" + "sync" + "time" +) + +// TaskQueue manages multiple Queues with different priorities. +// Task clients request tasks to run from this, and it decides which Task +// to give the client, based on a roughly even time division based on the +// priority numbers. Items from priority 2 will be given twice as much +// processing time as items from priorty 1, for instance. +type TaskQueue struct { + queues sync.Map + // task runners read off this channel, scheduler pushes onto it. + nextTask chan func() + // we need locking when cleaning up unused priorities. + mutex sync.Mutex + running bool + numJobs uint +} + +// NewTaskQueue returns an initialized TaskQueue. +// numJobs specifies how many task workers you want running simultaneously, if +// 0, defaults to runtime.GOMAXPROCS(0). +func NewTaskQueue(numJobs uint) (tq *TaskQueue) { + if numJobs == 0 { + numJobs = uint(runtime.GOMAXPROCS(0)) + } + tq = &TaskQueue{ + // make sure we can buffer up at least one extra job per worker. + nextTask: make(chan func(), numJobs*2), + numJobs: numJobs, + } + return tq +} + +// Add a task to the TaskQueue with a given priority. +func (tq *TaskQueue) Add(task func(), priority uint) { + q, _ := tq.queues.LoadOrStore(priority, NewQueue()) + q.(*Queue).Add(task) +} + +// Length returns the number of tasks at all priorities. +func (tq *TaskQueue) Length() (length uint) { + tq.mutex.Lock() + defer tq.mutex.Unlock() + tq.queues.Range(func(_, q interface{}) bool { + length += uint(q.(*Queue).Length()) + return true + }) + return length +} + +// PriorityCounts returns a map where the key is an active priority level, +// and the value is the number of tasks left at that priority. +func (tq *TaskQueue) PriorityCounts() (pc map[uint]int) { + pc = make(map[uint]int) + tq.mutex.Lock() + defer tq.mutex.Unlock() + tq.queues.Range(func(prio, q interface{}) bool { + pc[prio.(uint)] = q.(*Queue).Length() + return true + }) + return pc +} + +// Start begins processing tasks. +func (tq *TaskQueue) Start(scheduler Scheduler) { + tq.mutex.Lock() + tq.running = true + tq.mutex.Unlock() + // start workers! + var j uint + for j = 0; j < tq.numJobs; j++ { + go func() { + for job := range tq.nextTask { + job() + } + }() + } + if scheduler == nil { + scheduler = SimpleScheduler + } + go func() { + for tq.running { + workQueued := scheduler(tq) + if !workQueued { + time.Sleep(1 * time.Millisecond) + } + } + }() +} + +// Stop pauses processing of tasks. TaskQueue state is retained. +func (tq *TaskQueue) Stop() { + tq.mutex.Lock() + defer tq.mutex.Unlock() + tq.running = false +} diff --git a/taskqueue_test.go b/taskqueue_test.go new file mode 100644 index 0000000..dd0df52 --- /dev/null +++ b/taskqueue_test.go @@ -0,0 +1,51 @@ +package gtq + +import ( + "runtime" + "sync" + "testing" + "time" +) + +var numTestPrios = 10 +var tq *TaskQueue + +func testTQAdd(n int) { + workers := runtime.GOMAXPROCS(0) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + for i := 0; i < n; i++ { + // simplest possible task + tq.Add(func() { + time.Sleep(0) + }, uint(i%numTestPrios)) + } + wg.Done() + }() + } + wg.Wait() +} + +func TestTaskQueueAdd(t *testing.T) { + tq = NewTaskQueue(0) + testTQAdd(1000000) + expected := 1000000 * runtime.GOMAXPROCS(0) + added := tq.Length() + if uint(expected) != added { + t.Error("expected to add", expected, "but added", added) + } else { + t.Log("added", added) + } +} + +func BenchmarkTaskQueue(b *testing.B) { + tq = NewTaskQueue(0) + testTQAdd(b.N / (runtime.GOMAXPROCS(0))) + tq.Start(nil) + for tq.Length() > 0 { + time.Sleep(10 * time.Millisecond) + } + tq.Stop() +} diff --git a/uintslice.go b/uintslice.go new file mode 100644 index 0000000..95f87ff --- /dev/null +++ b/uintslice.go @@ -0,0 +1,25 @@ +package gtq + +// UIntSlice just subtypes []uint to fulfill sort.Interface, as demonstrated +// for other types in the sort package. +type UIntSlice []uint + +// Len helps fulfill sort.Interface. +func (list UIntSlice) Len() int { + return len(list) +} + +// Less helps fulfill sort.Interface. +func (list UIntSlice) Less(i, j int) bool { + if list[i] <= list[j] { + return true + } + return false +} + +// Swap helps fulfill sort.Interface +func (list UIntSlice) Swap(i, j int) { + tmp := list[i] + list[i] = list[j] + list[j] = tmp +}