diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go index 62074de..aec2176 100644 --- a/pkg/datasource/functions.go +++ b/pkg/datasource/functions.go @@ -2,8 +2,8 @@ package datasource import ( "fmt" - "time" + "github.com/alexanderzobnin/grafana-zabbix/pkg/gtime" "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" ) @@ -11,20 +11,26 @@ var errFunctionNotSupported = func(name string) error { return fmt.Errorf("function not supported: %s", name) } -type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries +type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) var funcMap map[string]DataProcessingFunc func init() { - funcMap = make(map[string]DataProcessingFunc) - funcMap["groupBy"] = applyGroupBy + funcMap = map[string]DataProcessingFunc{ + "groupBy": applyGroupBy, + } + } func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) { for _, f := range functions { if applyFunc, ok := funcMap[f.Def.Name]; ok { for _, s := range series { - s.TS = applyFunc(s.TS, f.Params...) + result, err := applyFunc(s.TS, f.Params...) + if err != nil { + return nil, err + } + s.TS = result } } else { err := errFunctionNotSupported(f.Def.Name) @@ -34,7 +40,32 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti return series, nil } -func applyGroupBy(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries { - s := series.GroupBy(time.Minute, "avg") - return s +func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { + pInterval := params[0] + pAgg := params[1] + interval, err := gtime.ParseInterval(pInterval) + if err != nil { + return nil, err + } + + aggFunc := getAggFunc(pAgg) + s := series.GroupBy(interval, aggFunc) + return s, nil +} + +func getAggFunc(agg string) timeseries.AggFunc { + switch agg { + case "avg": + return timeseries.AggAvg + case "max": + return timeseries.AggMax + case "min": + return timeseries.AggMin + case "first": + return timeseries.AggFirst + case "last": + return timeseries.AggLast + default: + return timeseries.AggAvg + } } diff --git a/pkg/timeseries/models.go b/pkg/timeseries/models.go index 65b50ad..4a3b3c7 100644 --- a/pkg/timeseries/models.go +++ b/pkg/timeseries/models.go @@ -29,3 +29,5 @@ type TimeSeriesData struct { type TimeSeriesMeta struct { Item *zabbix.Item } + +type AggFunc = func(points []TimePoint) *float64 diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index a0e58d5..401b6ae 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -17,7 +17,7 @@ func NewTimeSeriesData() *TimeSeriesData { } } -func (tsd *TimeSeriesData) Len() int { +func (tsd TimeSeriesData) Len() int { return len(tsd.TS) } @@ -30,8 +30,94 @@ func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData { return tsd } -func (ts TimeSeries) GroupBy(interval time.Duration, agg string) TimeSeries { - return ts +func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries { + if ts.Len() == 0 { + return ts + } + + groupedSeries := NewTimeSeries() + frame := make([]TimePoint, 0) + frameTS := ts[0].GetTimeFrame(interval) + var pointFrameTs time.Time + + for _, point := range ts { + pointFrameTs = point.GetTimeFrame(interval) + + // Iterate over points and push it into the frame if point time stamp fit the frame + if pointFrameTs == frameTS { + frame = append(frame, point) + } else if pointFrameTs.After(frameTS) { + // If point outside frame, then we've done with current frame + groupedSeries = append(groupedSeries, TimePoint{ + Time: frameTS, + Value: aggFunc(frame), + }) + + // Move frame window to next non-empty interval and fill empty by null + frameTS = frameTS.Add(interval) + for frameTS.Before(pointFrameTs) { + groupedSeries = append(groupedSeries, TimePoint{ + Time: frameTS, + Value: nil, + }) + frameTS = frameTS.Add(interval) + } + frame = []TimePoint{point} + } + } + + groupedSeries = append(groupedSeries, TimePoint{ + Time: frameTS, + Value: aggFunc(frame), + }) + + return groupedSeries +} + +func AggAvg(points []TimePoint) *float64 { + var sum float64 = 0 + for _, p := range points { + if p.Value != nil { + sum += *p.Value + } + } + return &sum +} + +func AggMax(points []TimePoint) *float64 { + var max *float64 = nil + for _, p := range points { + if p.Value != nil { + if max == nil { + max = p.Value + } else if *p.Value > *max { + max = p.Value + } + } + } + return max +} + +func AggMin(points []TimePoint) *float64 { + var min *float64 = nil + for _, p := range points { + if p.Value != nil { + if min == nil { + min = p.Value + } else if *p.Value < *min { + min = p.Value + } + } + } + return min +} + +func AggFirst(points []TimePoint) *float64 { + return points[0].Value +} + +func AggLast(points []TimePoint) *float64 { + return points[len(points)-1].Value } // Aligns point's time stamps according to provided interval.