initial working release

This commit is contained in:
Tessa Nordgren 2019-04-11 16:21:27 +02:00
commit 6482410063
11 changed files with 556 additions and 0 deletions

29
README.md Normal file
View File

@ -0,0 +1,29 @@
# gtq - the go task queue
## what?
a simple goroutine scheduler for longer-running goroutines, allowing you
to put them into priority buckets, and get roughly fair queuing of which
tasks will be processed next.
## why?
goroutines are awesome, and typically super fast. but what if you need to
queue up a lot of longer running work in the background? and what if you
want some control over which chunks of work will get executed next?
gtq provides simple task queuing and scheduling for longer running goroutines.
in this case, "long" means on the order of 1ms or longer, as many simple
goroutines can run in a matter of ns.
## how?
We use simple heap based queues for each priority level, and track statistics
in a rolling window for task execution times in each of those. Then we
have a scheduler which looks at those statistics, in relation to the priority
level for the queue, and decides which queue to pull from next and feed to task
runners.
Overhead for this process is typically < 1000ns, so it's not suitable for
typical short lived goroutines, but those type of goroutines don't really,
need scheduling anyway.
## usage
TODO

8
go.mod Normal file
View File

@ -0,0 +1,8 @@
module github.com/nergdron/gtq
go 1.12
require (
gopkg.in/eapache/queue.v1 v1.1.0
gopkg.in/yaml.v2 v2.2.2
)

5
go.sum Normal file
View File

@ -0,0 +1,5 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/eapache/queue.v1 v1.1.0 h1:EldqoJEGtXYiVCMRo2C9mePO2UUGnYn2+qLmlQSqPdc=
gopkg.in/eapache/queue.v1 v1.1.0/go.mod h1:wNtmx1/O7kZSR9zNT1TTOJ7GLpm3Vn7srzlfylFbQwU=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

71
queue.go Normal file
View File

@ -0,0 +1,71 @@
package gtq
import (
"gopkg.in/eapache/queue.v1"
"sync"
)
// NewQueue initializes and returns an new Queue.
func NewQueue() (q *Queue) {
return &Queue{
queue: queue.New(),
Done: NewStat(0),
}
}
// Queue wraps eapache/queue with locking for thread safety. Also bundles in
// Stats, used by schedulers to help pick tasks to execute.
type Queue struct {
queue *queue.Queue
mutex sync.Mutex
// Stat which tracks task removal rate from the queue.
Done *Stat
}
// Add puts an element on the end of the queue.
func (q *Queue) Add(task func()) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.queue.Add(task)
}
// Get returns the element at index i in the queue.
// This method accepts both positive and negative index values.
// Index 0 refers to the first element, and index -1 refers to the last.
func (q *Queue) Get(i int) (task func()) {
q.mutex.Lock()
defer q.mutex.Unlock()
if (q.queue.Length() < 1 || i > q.queue.Length()-1) || (i < -1) {
return nil
}
return q.queue.Get(i).(func())
}
// Length returns the number of elements currently stored in the queue.
func (q *Queue) Length() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.queue.Length()
}
// Peek returns the element at the head of the queue.
func (q *Queue) Peek() (task func()) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.Length() < 1 {
return nil
}
return q.queue.Peek().(func())
}
// Remove removes and returns the element from the front of the queue.
func (q *Queue) Remove() (task func()) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.queue.Length() < 1 {
return nil
}
task = q.queue.Remove().(func())
q.Done.Add(1)
return task
}

87
queue_test.go Normal file
View File

@ -0,0 +1,87 @@
package gtq
import (
"runtime"
"sync"
"testing"
"time"
)
var testQ = NewQueue()
func testAdd(n int) {
for i := 0; i < n; i++ {
testQ.Add(func() {
time.Sleep(1)
})
}
}
func testRemove(wg *sync.WaitGroup, removed chan int) {
defer wg.Done()
var i int
for testQ.Remove() != nil {
i++
}
removed <- i
}
func TestParallelQueueAdd(t *testing.T) {
max := runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
for n := 0; n < max; n++ {
wg.Add(1)
go func() {
defer wg.Done()
testAdd(1000000)
}()
}
wg.Wait()
t.Log("Queue size is", testQ.Length())
targetSize := max * 1000000
if testQ.Length() != targetSize {
t.Error("Queue size should be", targetSize)
}
}
func TestParallelQueueRemove(t *testing.T) {
target := testQ.Length()
var wg sync.WaitGroup
max := runtime.GOMAXPROCS(0)
removed := make(chan int, max)
for n := 0; n < max; n++ {
wg.Add(1)
go testRemove(&wg, removed)
}
wg.Wait()
close(removed)
var total int
for count := range removed {
total += count
}
if total != target {
t.Error("removed", total, "but expected", target)
}
t.Log("removed", total)
}
func BenchmarkParallelQueue(b *testing.B) {
testQ = NewQueue()
b.ResetTimer()
max := runtime.GOMAXPROCS(0)
var wg1, wg2 sync.WaitGroup
removed := make(chan int, max)
for i := 0; i < max; i++ {
wg1.Add(1)
wg2.Add(1)
go func() {
testAdd(b.N / (max * 2))
wg1.Done()
}()
go func() {
testRemove(&wg2, removed)
}()
}
wg1.Wait()
wg2.Wait()
}

39
scheduler.go Normal file
View File

@ -0,0 +1,39 @@
package gtq
import (
"sort"
)
// Scheduler is an internal goroutine which examines the Queues and
// choses which tasks to run next. Returns true if tasks were scheduled,
// or false if there's nothing left to schedule.
type Scheduler func(tq *TaskQueue) bool
// SimpleScheduler is the simplest possible implementation, which just takes
// tasks off the highest priority queue.
func SimpleScheduler(tq *TaskQueue) bool {
pc := tq.PriorityCounts()
// sort priorities
prios := make([]uint, 0, len(pc))
for k := range pc {
prios = append(prios, k)
}
sort.Sort(UIntSlice(prios))
var queued uint
for _, prio := range prios {
q, ok := tq.queues.Load(prio)
if !ok {
continue
}
for q.(*Queue).Length() > 0 {
task := q.(*Queue).Remove()
tq.nextTask <- task
queued++
}
}
if queued >= (tq.numJobs * 2) {
return true
}
return false
}

118
stats.go Normal file
View File

@ -0,0 +1,118 @@
package gtq
import (
"gopkg.in/yaml.v2"
"sync"
"time"
)
// Counter is an incrementing statistical value, with a Start time for rate
// calculation.
type Counter struct {
Start time.Time
Value uint64
}
// Stat for a queue priority level, grouped into timeslice buckets.
type Stat struct {
timeSlices []Counter
currentSlice int
window time.Duration
mutex sync.Mutex
}
// NewStat initializes and returns a new Stat. window determines
// the size of the rolling statistics timeframe that is captured,
// which in turn determines how finely grained stats collection and task
// scheduling can be. window defaults to 1 second.
func NewStat(window time.Duration) (s *Stat) {
s = &Stat{window: window}
if s.window == 0 {
s.window = time.Second
}
// default to 10 timeslice buckets, maybe make configurable later.
s.timeSlices = make([]Counter, 10)
return s
}
// Add increments the specified counter by the amount in val. window specifies
func (s *Stat) Add(val uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
// roll over to next timeslice if slice duration has been exceeded.
sliceLength := s.window / time.Duration(len(s.timeSlices))
now := time.Now()
elapsed := now.Sub(s.timeSlices[s.currentSlice].Start)
if elapsed > sliceLength {
s.currentSlice++
s.currentSlice = s.currentSlice % len(s.timeSlices)
s.timeSlices[s.currentSlice].Start = now
s.timeSlices[s.currentSlice].Value = val
}
s.timeSlices[s.currentSlice].Value += val
}
// Total returns the current rolling total for this Stat.
func (s *Stat) Total() (t uint64) {
for _, slice := range s.timeSlices {
t += slice.Value
}
return t
}
// Start returns the start time for this statistics window.
func (s *Stat) Start() (start time.Time) {
for i := s.currentSlice; i < 10; i++ {
if s.timeSlices[i%10].Start.After(time.Time{}) {
return s.timeSlices[i%10].Start
}
}
return start
}
// Duration returns the size of the current statistics window in this Stat.
// This will fluctuate between 90-100% of the window size set when the state
// was created, as internal buckets roll over.
func (s *Stat) Duration() time.Duration {
return time.Now().Sub(s.Start())
}
// Rate returns the flow rate of this counter, which is the current rolling
// value / the current statistics duration.
func (s *Stat) Rate() (r float64) {
return float64(s.Total()) / (float64(s.Duration()) / float64(time.Second))
}
// QueueStats is a report of formatted Stat data about a specific priority
// queue captured at a point in time.
type QueueStats struct {
Start time.Time
time.Duration
Count uint64
Rate float64
}
func (qs *QueueStats) String() string {
b, _ := yaml.Marshal(qs)
return string(b)
}
// Stats returns the current time usage statistics for all active task
// priority levels.
func (tq *TaskQueue) Stats() (stats map[uint]*QueueStats) {
stats = make(map[uint]*QueueStats)
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.queues.Range(func(prio, q interface{}) bool {
stat := q.(*Queue).Done
qs := &QueueStats{
Start: stat.Start(),
Duration: stat.Duration(),
Count: stat.Total(),
Rate: stat.Rate(),
}
stats[prio.(uint)] = qs
return true
})
return stats
}

22
stats_test.go Normal file
View File

@ -0,0 +1,22 @@
package gtq
import (
"math/rand"
"testing"
"time"
)
var testStat = NewStat(0)
func TestStat(t *testing.T) {
total := int64(10000)
perOp := int64(time.Second) / total
for i := int64(0); i < total; i++ {
testStat.Add(1)
// random sleep to introduce some flow rate variation to capture.
time.Sleep(time.Duration(rand.Float64() * float64(perOp)))
}
t.Log("completed", testStat.Total(), "ops in", testStat.Duration(),
"for a total rate of", int64(testStat.Rate()), "ops/sec",
)
}

101
taskqueue.go Normal file
View File

@ -0,0 +1,101 @@
package gtq
import (
"runtime"
"sync"
"time"
)
// TaskQueue manages multiple Queues with different priorities.
// Task clients request tasks to run from this, and it decides which Task
// to give the client, based on a roughly even time division based on the
// priority numbers. Items from priority 2 will be given twice as much
// processing time as items from priorty 1, for instance.
type TaskQueue struct {
queues sync.Map
// task runners read off this channel, scheduler pushes onto it.
nextTask chan func()
// we need locking when cleaning up unused priorities.
mutex sync.Mutex
running bool
numJobs uint
}
// NewTaskQueue returns an initialized TaskQueue.
// numJobs specifies how many task workers you want running simultaneously, if
// 0, defaults to runtime.GOMAXPROCS(0).
func NewTaskQueue(numJobs uint) (tq *TaskQueue) {
if numJobs == 0 {
numJobs = uint(runtime.GOMAXPROCS(0))
}
tq = &TaskQueue{
// make sure we can buffer up at least one extra job per worker.
nextTask: make(chan func(), numJobs*2),
numJobs: numJobs,
}
return tq
}
// Add a task to the TaskQueue with a given priority.
func (tq *TaskQueue) Add(task func(), priority uint) {
q, _ := tq.queues.LoadOrStore(priority, NewQueue())
q.(*Queue).Add(task)
}
// Length returns the number of tasks at all priorities.
func (tq *TaskQueue) Length() (length uint) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.queues.Range(func(_, q interface{}) bool {
length += uint(q.(*Queue).Length())
return true
})
return length
}
// PriorityCounts returns a map where the key is an active priority level,
// and the value is the number of tasks left at that priority.
func (tq *TaskQueue) PriorityCounts() (pc map[uint]int) {
pc = make(map[uint]int)
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.queues.Range(func(prio, q interface{}) bool {
pc[prio.(uint)] = q.(*Queue).Length()
return true
})
return pc
}
// Start begins processing tasks.
func (tq *TaskQueue) Start(scheduler Scheduler) {
tq.mutex.Lock()
tq.running = true
tq.mutex.Unlock()
// start workers!
var j uint
for j = 0; j < tq.numJobs; j++ {
go func() {
for job := range tq.nextTask {
job()
}
}()
}
if scheduler == nil {
scheduler = SimpleScheduler
}
go func() {
for tq.running {
workQueued := scheduler(tq)
if !workQueued {
time.Sleep(1 * time.Millisecond)
}
}
}()
}
// Stop pauses processing of tasks. TaskQueue state is retained.
func (tq *TaskQueue) Stop() {
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.running = false
}

51
taskqueue_test.go Normal file
View File

@ -0,0 +1,51 @@
package gtq
import (
"runtime"
"sync"
"testing"
"time"
)
var numTestPrios = 10
var tq *TaskQueue
func testTQAdd(n int) {
workers := runtime.GOMAXPROCS(0)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
for i := 0; i < n; i++ {
// simplest possible task
tq.Add(func() {
time.Sleep(0)
}, uint(i%numTestPrios))
}
wg.Done()
}()
}
wg.Wait()
}
func TestTaskQueueAdd(t *testing.T) {
tq = NewTaskQueue(0)
testTQAdd(1000000)
expected := 1000000 * runtime.GOMAXPROCS(0)
added := tq.Length()
if uint(expected) != added {
t.Error("expected to add", expected, "but added", added)
} else {
t.Log("added", added)
}
}
func BenchmarkTaskQueue(b *testing.B) {
tq = NewTaskQueue(0)
testTQAdd(b.N / (runtime.GOMAXPROCS(0)))
tq.Start(nil)
for tq.Length() > 0 {
time.Sleep(10 * time.Millisecond)
}
tq.Stop()
}

25
uintslice.go Normal file
View File

@ -0,0 +1,25 @@
package gtq
// UIntSlice just subtypes []uint to fulfill sort.Interface, as demonstrated
// for other types in the sort package.
type UIntSlice []uint
// Len helps fulfill sort.Interface.
func (list UIntSlice) Len() int {
return len(list)
}
// Less helps fulfill sort.Interface.
func (list UIntSlice) Less(i, j int) bool {
if list[i] <= list[j] {
return true
}
return false
}
// Swap helps fulfill sort.Interface
func (list UIntSlice) Swap(i, j int) {
tmp := list[i]
list[i] = list[j]
list[j] = tmp
}