diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go index 0635380..4d345cf 100644 --- a/pkg/datasource/functions.go +++ b/pkg/datasource/functions.go @@ -18,21 +18,25 @@ var errParsingFunctionParam = func(err error) error { type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) -type MetaDataProcessingFunc = func(series *timeseries.TimeSeriesData, params ...string) (*timeseries.TimeSeriesData, error) +type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) -var funcMap map[string]DataProcessingFunc +var seriesFuncMap map[string]DataProcessingFunc -var metaFuncMap map[string]MetaDataProcessingFunc +var aggFuncMap map[string]AggDataProcessingFunc var frontendFuncMap map[string]bool func init() { - funcMap = map[string]DataProcessingFunc{ + seriesFuncMap = map[string]DataProcessingFunc{ "groupBy": applyGroupBy, "scale": applyScale, "offset": applyOffset, } + aggFuncMap = map[string]AggDataProcessingFunc{ + "aggregateBy": applyAggregateBy, + } + // Functions processing on the frontend frontendFuncMap = map[string]bool{ "setAlias": true, @@ -43,7 +47,7 @@ func init() { func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) { for _, f := range functions { - if applyFunc, ok := funcMap[f.Def.Name]; ok { + if applyFunc, ok := seriesFuncMap[f.Def.Name]; ok { for _, s := range series { result, err := applyFunc(s.TS, f.Params...) if err != nil { @@ -51,6 +55,12 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti } s.TS = result } + } else if applyAggFunc, ok := aggFuncMap[f.Def.Name]; ok { + result, err := applyAggFunc(series, f.Params...) + if err != nil { + return nil, err + } + series = result } else if _, ok := frontendFuncMap[f.Def.Name]; ok { continue } else { @@ -74,6 +84,21 @@ func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.Ti return s, nil } +func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { + pInterval := params[0] + pAgg := params[1] + interval, err := gtime.ParseInterval(pInterval) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + aggFunc := getAggFunc(pAgg) + aggregatedSeries := timeseries.AggregateBy(series, interval, aggFunc) + aggregatedSeries.Meta.Name = fmt.Sprintf("aggregateBy(%s, %s)", pInterval, pAgg) + + return []*timeseries.TimeSeriesData{aggregatedSeries}, nil +} + func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { pFactor := params[0] factor, err := strconv.ParseFloat(pFactor, 64) diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index 8eabba0..576463c 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -30,6 +30,7 @@ func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData { return tsd } +// GroupBy groups points in given interval by applying provided `aggFunc`. Source time series should be sorted by time. func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries { if ts.Len() == 0 { return ts @@ -74,6 +75,29 @@ func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries return groupedSeries } +func AggregateBy(series []*TimeSeriesData, interval time.Duration, aggFunc AggFunc) *TimeSeriesData { + aggregatedSeries := NewTimeSeries() + + // Combine all points into one time series + for _, s := range series { + aggregatedSeries = append(aggregatedSeries, s.TS...) + } + + // GroupBy works correctly only with sorted time series + aggregatedSeries.Sort() + + aggregatedSeries = aggregatedSeries.GroupBy(interval, aggFunc) + aggregatedSeriesData := NewTimeSeriesData() + aggregatedSeriesData.TS = aggregatedSeries + return aggregatedSeriesData +} + +func (ts TimeSeries) Sort() { + sort.Slice(ts, func(i, j int) bool { + return ts[i].Time.Before(ts[j].Time) + }) +} + func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries { for i, p := range ts { ts[i] = transformFunc(p) diff --git a/src/datasource-zabbix/metricFunctions.ts b/src/datasource-zabbix/metricFunctions.ts index 2c1a4fc..9ded141 100644 --- a/src/datasource-zabbix/metricFunctions.ts +++ b/src/datasource-zabbix/metricFunctions.ts @@ -200,7 +200,7 @@ addFuncDef({ category: 'Aggregate', params: [ { name: 'interval', type: 'string' }, - { name: 'function', type: 'string', options: ['avg', 'min', 'max', 'sum', 'count', 'median'] } + { name: 'function', type: 'string', options: ['avg', 'min', 'max', 'sum', 'count', 'median', 'first', 'last'] } ], defaultParams: ['1m', 'avg'], });