gtq/stats.go

119 lines
3.1 KiB
Go
Raw Normal View History

2019-04-11 14:21:27 +00:00
package gtq
import (
"gopkg.in/yaml.v2"
"sync"
"time"
)
// Counter is an incrementing statistical value, with a Start time for rate
// calculation.
type Counter struct {
Start time.Time
Value uint64
}
// Stat for a queue priority level, grouped into timeslice buckets.
type Stat struct {
timeSlices []Counter
currentSlice int
window time.Duration
mutex sync.Mutex
}
// NewStat initializes and returns a new Stat. window determines
// the size of the rolling statistics timeframe that is captured,
// which in turn determines how finely grained stats collection and task
// scheduling can be. window defaults to 1 second.
func NewStat(window time.Duration) (s *Stat) {
s = &Stat{window: window}
if s.window == 0 {
s.window = time.Second
}
// default to 10 timeslice buckets, maybe make configurable later.
s.timeSlices = make([]Counter, 10)
return s
}
// Add increments the specified counter by the amount in val. window specifies
func (s *Stat) Add(val uint64) {
s.mutex.Lock()
defer s.mutex.Unlock()
// roll over to next timeslice if slice duration has been exceeded.
sliceLength := s.window / time.Duration(len(s.timeSlices))
now := time.Now()
elapsed := now.Sub(s.timeSlices[s.currentSlice].Start)
if elapsed > sliceLength {
s.currentSlice++
s.currentSlice = s.currentSlice % len(s.timeSlices)
s.timeSlices[s.currentSlice].Start = now
s.timeSlices[s.currentSlice].Value = val
}
s.timeSlices[s.currentSlice].Value += val
}
// Total returns the current rolling total for this Stat.
func (s *Stat) Total() (t uint64) {
for _, slice := range s.timeSlices {
t += slice.Value
}
return t
}
// Start returns the start time for this statistics window.
func (s *Stat) Start() (start time.Time) {
for i := s.currentSlice; i < 10; i++ {
if s.timeSlices[i%10].Start.After(time.Time{}) {
return s.timeSlices[i%10].Start
}
}
return start
}
// Duration returns the size of the current statistics window in this Stat.
// This will fluctuate between 90-100% of the window size set when the state
// was created, as internal buckets roll over.
func (s *Stat) Duration() time.Duration {
return time.Since(s.Start())
2019-04-11 14:21:27 +00:00
}
// Rate returns the flow rate of this counter, which is the current rolling
// value / the current statistics duration.
func (s *Stat) Rate() (r float64) {
return float64(s.Total()) / (float64(s.Duration()) / float64(time.Second))
}
// QueueStats is a report of formatted Stat data about a specific priority
// queue captured at a point in time.
type QueueStats struct {
Start time.Time
time.Duration
Count uint64
Rate float64
}
func (qs *QueueStats) String() string {
b, _ := yaml.Marshal(qs)
return string(b)
}
// Stats returns the current time usage statistics for all active task
// priority levels.
func (tq *TaskQueue) Stats() (stats map[uint]*QueueStats) {
stats = make(map[uint]*QueueStats)
tq.mutex.Lock()
defer tq.mutex.Unlock()
tq.queues.Range(func(prio, q interface{}) bool {
stat := q.(*Queue).Done
qs := &QueueStats{
Start: stat.Start(),
Duration: stat.Duration(),
Count: stat.Total(),
Rate: stat.Rate(),
}
stats[prio.(uint)] = qs
return true
})
return stats
}