package gtq import ( "gopkg.in/yaml.v2" "sync" "time" ) // Counter is an incrementing statistical value, with a Start time for rate // calculation. type Counter struct { Start time.Time Value uint64 } // Stat for a queue priority level, grouped into timeslice buckets. type Stat struct { timeSlices []Counter currentSlice int window time.Duration mutex sync.Mutex } // NewStat initializes and returns a new Stat. window determines // the size of the rolling statistics timeframe that is captured, // which in turn determines how finely grained stats collection and task // scheduling can be. window defaults to 1 second. func NewStat(window time.Duration) (s *Stat) { s = &Stat{window: window} if s.window == 0 { s.window = time.Second } // default to 10 timeslice buckets, maybe make configurable later. s.timeSlices = make([]Counter, 10) return s } // Add increments the specified counter by the amount in val. window specifies func (s *Stat) Add(val uint64) { s.mutex.Lock() defer s.mutex.Unlock() // roll over to next timeslice if slice duration has been exceeded. sliceLength := s.window / time.Duration(len(s.timeSlices)) now := time.Now() elapsed := now.Sub(s.timeSlices[s.currentSlice].Start) if elapsed > sliceLength { s.currentSlice++ s.currentSlice = s.currentSlice % len(s.timeSlices) s.timeSlices[s.currentSlice].Start = now s.timeSlices[s.currentSlice].Value = val } s.timeSlices[s.currentSlice].Value += val } // Total returns the current rolling total for this Stat. func (s *Stat) Total() (t uint64) { for _, slice := range s.timeSlices { t += slice.Value } return t } // Start returns the start time for this statistics window. func (s *Stat) Start() (start time.Time) { for i := s.currentSlice; i < 10; i++ { if s.timeSlices[i%10].Start.After(time.Time{}) { return s.timeSlices[i%10].Start } } return start } // Duration returns the size of the current statistics window in this Stat. // This will fluctuate between 90-100% of the window size set when the state // was created, as internal buckets roll over. func (s *Stat) Duration() time.Duration { return time.Now().Sub(s.Start()) } // Rate returns the flow rate of this counter, which is the current rolling // value / the current statistics duration. func (s *Stat) Rate() (r float64) { return float64(s.Total()) / (float64(s.Duration()) / float64(time.Second)) } // QueueStats is a report of formatted Stat data about a specific priority // queue captured at a point in time. type QueueStats struct { Start time.Time time.Duration Count uint64 Rate float64 } func (qs *QueueStats) String() string { b, _ := yaml.Marshal(qs) return string(b) } // Stats returns the current time usage statistics for all active task // priority levels. func (tq *TaskQueue) Stats() (stats map[uint]*QueueStats) { stats = make(map[uint]*QueueStats) tq.mutex.Lock() defer tq.mutex.Unlock() tq.queues.Range(func(prio, q interface{}) bool { stat := q.(*Queue).Done qs := &QueueStats{ Start: stat.Start(), Duration: stat.Duration(), Count: stat.Total(), Rate: stat.Rate(), } stats[prio.(uint)] = qs return true }) return stats }