gtq/taskqueue.go

102 lines
2.5 KiB
Go
Raw Permalink Normal View History

2019-04-11 14:21:27 +00:00
package gtq
import (
"runtime"
"sync"
"time"
)
// TaskQueue manages multiple Queues with different priorities.
// Task clients request tasks to run from this, and it decides which Task
// to give the client, based on a roughly even time division based on the
// priority numbers. Items from priority 2 will be given twice as much
// processing time as items from priorty 1, for instance.
type TaskQueue struct {
queues sync.Map
// task runners read off this channel, scheduler pushes onto it.
nextTask chan func()
// we need locking when cleaning up unused priorities.
mutex sync.Mutex
running bool
numJobs uint
}
// NewTaskQueue returns an initialized TaskQueue.
// numJobs specifies how many task workers you want running simultaneously, if
// 0, defaults to runtime.GOMAXPROCS(0).
func NewTaskQueue(numJobs uint) (tq *TaskQueue) {
if numJobs == 0 {
numJobs = uint(runtime.GOMAXPROCS(0))
}
tq = &TaskQueue{
// make sure we can buffer up at least one extra job per worker.
nextTask: make(chan func(), numJobs*2),
numJobs: numJobs,
}
return tq
}
// Add a task to the TaskQueue with a given priority.
func (tq *TaskQueue) Add(task func(), priority uint) {
q, _ := tq.queues.LoadOrStore(priority, NewQueue())
q.(*Queue).Add(task)
}
// Length returns the number of tasks at all priorities.
func (tq *TaskQueue) Length() (length uint) {
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.queues.Range(func(_, q interface{}) bool {
length += uint(q.(*Queue).Length())
return true
})
return length
}
// PriorityCounts returns a map where the key is an active priority level,
// and the value is the number of tasks left at that priority.
func (tq *TaskQueue) PriorityCounts() (pc map[uint]int) {
pc = make(map[uint]int)
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.queues.Range(func(prio, q interface{}) bool {
pc[prio.(uint)] = q.(*Queue).Length()
return true
})
return pc
}
// Start begins processing tasks.
func (tq *TaskQueue) Start(scheduler Scheduler) {
tq.mutex.Lock()
tq.running = true
tq.mutex.Unlock()
// start workers!
var j uint
for j = 0; j < tq.numJobs; j++ {
go func() {
for job := range tq.nextTask {
job()
}
}()
}
if scheduler == nil {
scheduler = TimeSliceScheduler
2019-04-11 14:21:27 +00:00
}
go func() {
for tq.running {
workQueued := scheduler(tq)
if !workQueued {
time.Sleep(1 * time.Millisecond)
}
}
}()
}
// Stop pauses processing of tasks. TaskQueue state is retained.
func (tq *TaskQueue) Stop() {
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.running = false
}