102 lines
2.5 KiB
Go
102 lines
2.5 KiB
Go
package gtq
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// TaskQueue manages multiple Queues with different priorities.
|
|
// Task clients request tasks to run from this, and it decides which Task
|
|
// to give the client, based on a roughly even time division based on the
|
|
// priority numbers. Items from priority 2 will be given twice as much
|
|
// processing time as items from priorty 1, for instance.
|
|
type TaskQueue struct {
|
|
queues sync.Map
|
|
// task runners read off this channel, scheduler pushes onto it.
|
|
nextTask chan func()
|
|
// we need locking when cleaning up unused priorities.
|
|
mutex sync.Mutex
|
|
running bool
|
|
numJobs uint
|
|
}
|
|
|
|
// NewTaskQueue returns an initialized TaskQueue.
|
|
// numJobs specifies how many task workers you want running simultaneously, if
|
|
// 0, defaults to runtime.GOMAXPROCS(0).
|
|
func NewTaskQueue(numJobs uint) (tq *TaskQueue) {
|
|
if numJobs == 0 {
|
|
numJobs = uint(runtime.GOMAXPROCS(0))
|
|
}
|
|
tq = &TaskQueue{
|
|
// make sure we can buffer up at least one extra job per worker.
|
|
nextTask: make(chan func(), numJobs*2),
|
|
numJobs: numJobs,
|
|
}
|
|
return tq
|
|
}
|
|
|
|
// Add a task to the TaskQueue with a given priority.
|
|
func (tq *TaskQueue) Add(task func(), priority uint) {
|
|
q, _ := tq.queues.LoadOrStore(priority, NewQueue())
|
|
q.(*Queue).Add(task)
|
|
}
|
|
|
|
// Length returns the number of tasks at all priorities.
|
|
func (tq *TaskQueue) Length() (length uint) {
|
|
tq.mutex.Lock()
|
|
defer tq.mutex.Unlock()
|
|
tq.queues.Range(func(_, q interface{}) bool {
|
|
length += uint(q.(*Queue).Length())
|
|
return true
|
|
})
|
|
return length
|
|
}
|
|
|
|
// PriorityCounts returns a map where the key is an active priority level,
|
|
// and the value is the number of tasks left at that priority.
|
|
func (tq *TaskQueue) PriorityCounts() (pc map[uint]int) {
|
|
pc = make(map[uint]int)
|
|
tq.mutex.Lock()
|
|
defer tq.mutex.Unlock()
|
|
tq.queues.Range(func(prio, q interface{}) bool {
|
|
pc[prio.(uint)] = q.(*Queue).Length()
|
|
return true
|
|
})
|
|
return pc
|
|
}
|
|
|
|
// Start begins processing tasks.
|
|
func (tq *TaskQueue) Start(scheduler Scheduler) {
|
|
tq.mutex.Lock()
|
|
tq.running = true
|
|
tq.mutex.Unlock()
|
|
// start workers!
|
|
var j uint
|
|
for j = 0; j < tq.numJobs; j++ {
|
|
go func() {
|
|
for job := range tq.nextTask {
|
|
job()
|
|
}
|
|
}()
|
|
}
|
|
if scheduler == nil {
|
|
scheduler = TimeSliceScheduler
|
|
}
|
|
go func() {
|
|
for tq.running {
|
|
workQueued := scheduler(tq)
|
|
if !workQueued {
|
|
time.Sleep(1 * time.Millisecond)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop pauses processing of tasks. TaskQueue state is retained.
|
|
func (tq *TaskQueue) Stop() {
|
|
tq.mutex.Lock()
|
|
defer tq.mutex.Unlock()
|
|
tq.running = false
|
|
}
|