119 lines
3.1 KiB
Go
119 lines
3.1 KiB
Go
|
package gtq
|
||
|
|
||
|
import (
|
||
|
"gopkg.in/yaml.v2"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// Counter is an incrementing statistical value, with a Start time for rate
|
||
|
// calculation.
|
||
|
type Counter struct {
|
||
|
Start time.Time
|
||
|
Value uint64
|
||
|
}
|
||
|
|
||
|
// Stat for a queue priority level, grouped into timeslice buckets.
|
||
|
type Stat struct {
|
||
|
timeSlices []Counter
|
||
|
currentSlice int
|
||
|
window time.Duration
|
||
|
mutex sync.Mutex
|
||
|
}
|
||
|
|
||
|
// NewStat initializes and returns a new Stat. window determines
|
||
|
// the size of the rolling statistics timeframe that is captured,
|
||
|
// which in turn determines how finely grained stats collection and task
|
||
|
// scheduling can be. window defaults to 1 second.
|
||
|
func NewStat(window time.Duration) (s *Stat) {
|
||
|
s = &Stat{window: window}
|
||
|
if s.window == 0 {
|
||
|
s.window = time.Second
|
||
|
}
|
||
|
// default to 10 timeslice buckets, maybe make configurable later.
|
||
|
s.timeSlices = make([]Counter, 10)
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
// Add increments the specified counter by the amount in val. window specifies
|
||
|
func (s *Stat) Add(val uint64) {
|
||
|
s.mutex.Lock()
|
||
|
defer s.mutex.Unlock()
|
||
|
// roll over to next timeslice if slice duration has been exceeded.
|
||
|
sliceLength := s.window / time.Duration(len(s.timeSlices))
|
||
|
now := time.Now()
|
||
|
elapsed := now.Sub(s.timeSlices[s.currentSlice].Start)
|
||
|
if elapsed > sliceLength {
|
||
|
s.currentSlice++
|
||
|
s.currentSlice = s.currentSlice % len(s.timeSlices)
|
||
|
s.timeSlices[s.currentSlice].Start = now
|
||
|
s.timeSlices[s.currentSlice].Value = val
|
||
|
}
|
||
|
s.timeSlices[s.currentSlice].Value += val
|
||
|
}
|
||
|
|
||
|
// Total returns the current rolling total for this Stat.
|
||
|
func (s *Stat) Total() (t uint64) {
|
||
|
for _, slice := range s.timeSlices {
|
||
|
t += slice.Value
|
||
|
}
|
||
|
return t
|
||
|
}
|
||
|
|
||
|
// Start returns the start time for this statistics window.
|
||
|
func (s *Stat) Start() (start time.Time) {
|
||
|
for i := s.currentSlice; i < 10; i++ {
|
||
|
if s.timeSlices[i%10].Start.After(time.Time{}) {
|
||
|
return s.timeSlices[i%10].Start
|
||
|
}
|
||
|
}
|
||
|
return start
|
||
|
}
|
||
|
|
||
|
// Duration returns the size of the current statistics window in this Stat.
|
||
|
// This will fluctuate between 90-100% of the window size set when the state
|
||
|
// was created, as internal buckets roll over.
|
||
|
func (s *Stat) Duration() time.Duration {
|
||
|
return time.Now().Sub(s.Start())
|
||
|
}
|
||
|
|
||
|
// Rate returns the flow rate of this counter, which is the current rolling
|
||
|
// value / the current statistics duration.
|
||
|
func (s *Stat) Rate() (r float64) {
|
||
|
return float64(s.Total()) / (float64(s.Duration()) / float64(time.Second))
|
||
|
}
|
||
|
|
||
|
// QueueStats is a report of formatted Stat data about a specific priority
|
||
|
// queue captured at a point in time.
|
||
|
type QueueStats struct {
|
||
|
Start time.Time
|
||
|
time.Duration
|
||
|
Count uint64
|
||
|
Rate float64
|
||
|
}
|
||
|
|
||
|
func (qs *QueueStats) String() string {
|
||
|
b, _ := yaml.Marshal(qs)
|
||
|
return string(b)
|
||
|
}
|
||
|
|
||
|
// Stats returns the current time usage statistics for all active task
|
||
|
// priority levels.
|
||
|
func (tq *TaskQueue) Stats() (stats map[uint]*QueueStats) {
|
||
|
stats = make(map[uint]*QueueStats)
|
||
|
tq.mutex.Lock()
|
||
|
defer tq.mutex.Unlock()
|
||
|
tq.queues.Range(func(prio, q interface{}) bool {
|
||
|
stat := q.(*Queue).Done
|
||
|
qs := &QueueStats{
|
||
|
Start: stat.Start(),
|
||
|
Duration: stat.Duration(),
|
||
|
Count: stat.Total(),
|
||
|
Rate: stat.Rate(),
|
||
|
}
|
||
|
stats[prio.(uint)] = qs
|
||
|
return true
|
||
|
})
|
||
|
return stats
|
||
|
}
|