Implement groupBy

This commit is contained in:
Alexander Zobnin
2021-05-25 15:47:18 +03:00
parent 3f7cabcd53
commit ebd9b46096
3 changed files with 130 additions and 11 deletions

View File

@@ -2,8 +2,8 @@ package datasource
import ( import (
"fmt" "fmt"
"time"
"github.com/alexanderzobnin/grafana-zabbix/pkg/gtime"
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
) )
@@ -11,20 +11,26 @@ var errFunctionNotSupported = func(name string) error {
return fmt.Errorf("function not supported: %s", name) return fmt.Errorf("function not supported: %s", name)
} }
type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error)
var funcMap map[string]DataProcessingFunc var funcMap map[string]DataProcessingFunc
func init() { func init() {
funcMap = make(map[string]DataProcessingFunc) funcMap = map[string]DataProcessingFunc{
funcMap["groupBy"] = applyGroupBy "groupBy": applyGroupBy,
}
} }
func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) { func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) {
for _, f := range functions { for _, f := range functions {
if applyFunc, ok := funcMap[f.Def.Name]; ok { if applyFunc, ok := funcMap[f.Def.Name]; ok {
for _, s := range series { for _, s := range series {
s.TS = applyFunc(s.TS, f.Params...) result, err := applyFunc(s.TS, f.Params...)
if err != nil {
return nil, err
}
s.TS = result
} }
} else { } else {
err := errFunctionNotSupported(f.Def.Name) err := errFunctionNotSupported(f.Def.Name)
@@ -34,7 +40,32 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti
return series, nil return series, nil
} }
func applyGroupBy(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries { func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) {
s := series.GroupBy(time.Minute, "avg") pInterval := params[0]
return s pAgg := params[1]
interval, err := gtime.ParseInterval(pInterval)
if err != nil {
return nil, err
}
aggFunc := getAggFunc(pAgg)
s := series.GroupBy(interval, aggFunc)
return s, nil
}
func getAggFunc(agg string) timeseries.AggFunc {
switch agg {
case "avg":
return timeseries.AggAvg
case "max":
return timeseries.AggMax
case "min":
return timeseries.AggMin
case "first":
return timeseries.AggFirst
case "last":
return timeseries.AggLast
default:
return timeseries.AggAvg
}
} }

View File

@@ -29,3 +29,5 @@ type TimeSeriesData struct {
type TimeSeriesMeta struct { type TimeSeriesMeta struct {
Item *zabbix.Item Item *zabbix.Item
} }
type AggFunc = func(points []TimePoint) *float64

View File

@@ -17,7 +17,7 @@ func NewTimeSeriesData() *TimeSeriesData {
} }
} }
func (tsd *TimeSeriesData) Len() int { func (tsd TimeSeriesData) Len() int {
return len(tsd.TS) return len(tsd.TS)
} }
@@ -30,10 +30,96 @@ func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData {
return tsd return tsd
} }
func (ts TimeSeries) GroupBy(interval time.Duration, agg string) TimeSeries { func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries {
if ts.Len() == 0 {
return ts return ts
} }
groupedSeries := NewTimeSeries()
frame := make([]TimePoint, 0)
frameTS := ts[0].GetTimeFrame(interval)
var pointFrameTs time.Time
for _, point := range ts {
pointFrameTs = point.GetTimeFrame(interval)
// Iterate over points and push it into the frame if point time stamp fit the frame
if pointFrameTs == frameTS {
frame = append(frame, point)
} else if pointFrameTs.After(frameTS) {
// If point outside frame, then we've done with current frame
groupedSeries = append(groupedSeries, TimePoint{
Time: frameTS,
Value: aggFunc(frame),
})
// Move frame window to next non-empty interval and fill empty by null
frameTS = frameTS.Add(interval)
for frameTS.Before(pointFrameTs) {
groupedSeries = append(groupedSeries, TimePoint{
Time: frameTS,
Value: nil,
})
frameTS = frameTS.Add(interval)
}
frame = []TimePoint{point}
}
}
groupedSeries = append(groupedSeries, TimePoint{
Time: frameTS,
Value: aggFunc(frame),
})
return groupedSeries
}
func AggAvg(points []TimePoint) *float64 {
var sum float64 = 0
for _, p := range points {
if p.Value != nil {
sum += *p.Value
}
}
return &sum
}
func AggMax(points []TimePoint) *float64 {
var max *float64 = nil
for _, p := range points {
if p.Value != nil {
if max == nil {
max = p.Value
} else if *p.Value > *max {
max = p.Value
}
}
}
return max
}
func AggMin(points []TimePoint) *float64 {
var min *float64 = nil
for _, p := range points {
if p.Value != nil {
if min == nil {
min = p.Value
} else if *p.Value < *min {
min = p.Value
}
}
}
return min
}
func AggFirst(points []TimePoint) *float64 {
return points[0].Value
}
func AggLast(points []TimePoint) *float64 {
return points[len(points)-1].Value
}
// Aligns point's time stamps according to provided interval. // Aligns point's time stamps according to provided interval.
func (ts TimeSeries) Align(interval time.Duration) TimeSeries { func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
if interval <= 0 || ts.Len() < 2 { if interval <= 0 || ts.Len() < 2 {