Implement top/bottom

This commit is contained in:
Alexander Zobnin
2021-05-27 10:51:33 +03:00
parent dceb4bd924
commit fe0192258e
2 changed files with 65 additions and 0 deletions

View File

@@ -45,6 +45,8 @@ var seriesFuncMap map[string]DataProcessingFunc
var aggFuncMap map[string]AggDataProcessingFunc
var filterFuncMap map[string]AggDataProcessingFunc
var frontendFuncMap map[string]bool
func init() {
@@ -61,6 +63,11 @@ func init() {
"percentileAgg": applyPercentileAgg,
}
filterFuncMap = map[string]AggDataProcessingFunc{
"top": applyTop,
"bottom": applyBottom,
}
// Functions processing on the frontend
frontendFuncMap = map[string]bool{
"setAlias": true,
@@ -85,6 +92,12 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti
return nil, err
}
series = result
} else if applyFilterFunc, ok := filterFuncMap[f.Def.Name]; ok {
result, err := applyFilterFunc(series, f.Params...)
if err != nil {
return nil, err
}
series = result
} else if _, ok := frontendFuncMap[f.Def.Name]; ok {
continue
} else {
@@ -207,6 +220,30 @@ func applyPercentileAgg(series []*timeseries.TimeSeriesData, params ...interface
return []*timeseries.TimeSeriesData{aggregatedSeries}, nil
}
func applyTop(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) {
n, err := MustFloat64(params[0])
pAgg, err := MustString(params[1])
if err != nil {
return nil, errParsingFunctionParam(err)
}
aggFunc := getAggFunc(pAgg)
filteredSeries := timeseries.Filter(series, int(n), "top", aggFunc)
return filteredSeries, nil
}
func applyBottom(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) {
n, err := MustFloat64(params[0])
pAgg, err := MustString(params[1])
if err != nil {
return nil, errParsingFunctionParam(err)
}
aggFunc := getAggFunc(pAgg)
filteredSeries := timeseries.Filter(series, int(n), "bottom", aggFunc)
return filteredSeries, nil
}
func getAggFunc(agg string) timeseries.AggFunc {
switch agg {
case "avg":

View File

@@ -94,6 +94,34 @@ func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries {
return ts
}
func Filter(series []*TimeSeriesData, n int, order string, aggFunc AggFunc) []*TimeSeriesData {
aggregatedSeries := make([]TimeSeries, len(series))
for i, s := range series {
aggregatedSeries[i] = s.TS.GroupByRange(aggFunc)
}
// Sort by aggregated value
sort.Slice(series, func(i, j int) bool {
if len(aggregatedSeries[i]) > 0 && len(aggregatedSeries[j]) > 0 {
return *aggregatedSeries[i][0].Value < *aggregatedSeries[j][0].Value
} else if len(aggregatedSeries[j]) > 0 {
return true
}
return false
})
filteredSeries := make([]*TimeSeriesData, n)
for i := 0; i < n; i++ {
if order == "top" {
filteredSeries[i] = series[len(aggregatedSeries)-1-i]
} else if order == "bottom" {
filteredSeries[i] = series[i]
}
}
return filteredSeries
}
func AggregateBy(series []*TimeSeriesData, interval time.Duration, aggFunc AggFunc) *TimeSeriesData {
aggregatedSeries := NewTimeSeries()