From fe0192258e641c4fbb902ae76d86d6d58d89cfd0 Mon Sep 17 00:00:00 2001 From: Alexander Zobnin Date: Thu, 27 May 2021 10:51:33 +0300 Subject: [PATCH] Implement top/bottom --- pkg/datasource/functions.go | 37 ++++++++++++++++++++++++++++++++++++ pkg/timeseries/timeseries.go | 28 +++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go index 2af60a5..23fcbb9 100644 --- a/pkg/datasource/functions.go +++ b/pkg/datasource/functions.go @@ -45,6 +45,8 @@ var seriesFuncMap map[string]DataProcessingFunc var aggFuncMap map[string]AggDataProcessingFunc +var filterFuncMap map[string]AggDataProcessingFunc + var frontendFuncMap map[string]bool func init() { @@ -61,6 +63,11 @@ func init() { "percentileAgg": applyPercentileAgg, } + filterFuncMap = map[string]AggDataProcessingFunc{ + "top": applyTop, + "bottom": applyBottom, + } + // Functions processing on the frontend frontendFuncMap = map[string]bool{ "setAlias": true, @@ -85,6 +92,12 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti return nil, err } series = result + } else if applyFilterFunc, ok := filterFuncMap[f.Def.Name]; ok { + result, err := applyFilterFunc(series, f.Params...) + if err != nil { + return nil, err + } + series = result } else if _, ok := frontendFuncMap[f.Def.Name]; ok { continue } else { @@ -207,6 +220,30 @@ func applyPercentileAgg(series []*timeseries.TimeSeriesData, params ...interface return []*timeseries.TimeSeriesData{aggregatedSeries}, nil } +func applyTop(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) { + n, err := MustFloat64(params[0]) + pAgg, err := MustString(params[1]) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + aggFunc := getAggFunc(pAgg) + filteredSeries := timeseries.Filter(series, int(n), "top", aggFunc) + return filteredSeries, nil +} + +func applyBottom(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) { + n, err := MustFloat64(params[0]) + pAgg, err := MustString(params[1]) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + aggFunc := getAggFunc(pAgg) + filteredSeries := timeseries.Filter(series, int(n), "bottom", aggFunc) + return filteredSeries, nil +} + func getAggFunc(agg string) timeseries.AggFunc { switch agg { case "avg": diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index 923f27d..3a9f9a0 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -94,6 +94,34 @@ func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries { return ts } +func Filter(series []*TimeSeriesData, n int, order string, aggFunc AggFunc) []*TimeSeriesData { + aggregatedSeries := make([]TimeSeries, len(series)) + for i, s := range series { + aggregatedSeries[i] = s.TS.GroupByRange(aggFunc) + } + + // Sort by aggregated value + sort.Slice(series, func(i, j int) bool { + if len(aggregatedSeries[i]) > 0 && len(aggregatedSeries[j]) > 0 { + return *aggregatedSeries[i][0].Value < *aggregatedSeries[j][0].Value + } else if len(aggregatedSeries[j]) > 0 { + return true + } + return false + }) + + filteredSeries := make([]*TimeSeriesData, n) + for i := 0; i < n; i++ { + if order == "top" { + filteredSeries[i] = series[len(aggregatedSeries)-1-i] + } else if order == "bottom" { + filteredSeries[i] = series[i] + } + } + + return filteredSeries +} + func AggregateBy(series []*TimeSeriesData, interval time.Duration, aggFunc AggFunc) *TimeSeriesData { aggregatedSeries := NewTimeSeries()