Implement percentileAgg

This commit is contained in:
Alexander Zobnin
2021-05-27 10:13:48 +03:00
parent ac220a76e8
commit 62d569cd53
4 changed files with 99 additions and 41 deletions

View File

@@ -10,17 +10,36 @@ import (
const RANGE_VARIABLE_VALUE = "range_series" const RANGE_VARIABLE_VALUE = "range_series"
var errFunctionNotSupported = func(name string) error { var (
errFunctionNotSupported = func(name string) error {
return fmt.Errorf("function not supported: %s", name) return fmt.Errorf("function not supported: %s", name)
} }
errParsingFunctionParam = func(err error) error {
var errParsingFunctionParam = func(err error) error {
return fmt.Errorf("failed to parse function param: %s", err) return fmt.Errorf("failed to parse function param: %s", err)
} }
)
type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) func MustString(p QueryFunctionParam) (string, error) {
if pStr, ok := p.(string); ok {
return pStr, nil
}
return "", fmt.Errorf("failed to convert value to string: %v", p)
}
type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) func MustFloat64(p QueryFunctionParam) (float64, error) {
if pFloat, ok := p.(float64); ok {
return pFloat, nil
} else if pStr, ok := p.(string); ok {
if pFloat, err := strconv.ParseFloat(pStr, 64); err == nil {
return pFloat, nil
}
}
return 0, fmt.Errorf("failed to convert value to float: %v", p)
}
type DataProcessingFunc = func(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error)
type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error)
var seriesFuncMap map[string]DataProcessingFunc var seriesFuncMap map[string]DataProcessingFunc
@@ -38,6 +57,7 @@ func init() {
aggFuncMap = map[string]AggDataProcessingFunc{ aggFuncMap = map[string]AggDataProcessingFunc{
"aggregateBy": applyAggregateBy, "aggregateBy": applyAggregateBy,
"sumSeries": applySumSeries, "sumSeries": applySumSeries,
"percentileAgg": applyPercentileAgg,
} }
// Functions processing on the frontend // Functions processing on the frontend
@@ -74,11 +94,14 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti
return series, nil return series, nil
} }
func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { func applyGroupBy(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) {
pInterval := params[0] pInterval, err := MustString(params[0])
pAgg := params[1] pAgg, err := MustString(params[1])
aggFunc := getAggFunc(pAgg) if err != nil {
return nil, errParsingFunctionParam(err)
}
aggFunc := getAggFunc(pAgg)
if pInterval == RANGE_VARIABLE_VALUE { if pInterval == RANGE_VARIABLE_VALUE {
s := series.GroupByRange(aggFunc) s := series.GroupByRange(aggFunc)
return s, nil return s, nil
@@ -93,8 +116,11 @@ func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.Ti
return s, nil return s, nil
} }
func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { func applyScale(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) {
pFactor := params[0] pFactor, err := MustString(params[0])
if err != nil {
return nil, errParsingFunctionParam(err)
}
factor, err := strconv.ParseFloat(pFactor, 64) factor, err := strconv.ParseFloat(pFactor, 64)
if err != nil { if err != nil {
return nil, errParsingFunctionParam(err) return nil, errParsingFunctionParam(err)
@@ -104,9 +130,8 @@ func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.Time
return series.Transform(transformFunc), nil return series.Transform(transformFunc), nil
} }
func applyOffset(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { func applyOffset(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) {
pOffset := params[0] offset, err := MustFloat64(params[0])
offset, err := strconv.ParseFloat(pOffset, 64)
if err != nil { if err != nil {
return nil, errParsingFunctionParam(err) return nil, errParsingFunctionParam(err)
} }
@@ -115,9 +140,13 @@ func applyOffset(series timeseries.TimeSeries, params ...string) (timeseries.Tim
return series.Transform(transformFunc), nil return series.Transform(transformFunc), nil
} }
func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) {
pInterval := params[0] pInterval, err := MustString(params[0])
pAgg := params[1] pAgg, err := MustString(params[1])
if err != nil {
return nil, errParsingFunctionParam(err)
}
interval, err := gtime.ParseInterval(pInterval) interval, err := gtime.ParseInterval(pInterval)
if err != nil { if err != nil {
return nil, errParsingFunctionParam(err) return nil, errParsingFunctionParam(err)
@@ -130,12 +159,31 @@ func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]
return []*timeseries.TimeSeriesData{aggregatedSeries}, nil return []*timeseries.TimeSeriesData{aggregatedSeries}, nil
} }
func applySumSeries(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { func applySumSeries(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) {
sum := timeseries.SumSeries(series) sum := timeseries.SumSeries(series)
sum.Meta.Name = "sumSeries()" sum.Meta.Name = "sumSeries()"
return []*timeseries.TimeSeriesData{sum}, nil return []*timeseries.TimeSeriesData{sum}, nil
} }
func applyPercentileAgg(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) {
pInterval, err := MustString(params[0])
percentile, err := MustFloat64(params[1])
if err != nil {
return nil, errParsingFunctionParam(err)
}
interval, err := gtime.ParseInterval(pInterval)
if err != nil {
return nil, errParsingFunctionParam(err)
}
aggFunc := timeseries.AggPercentile(percentile)
aggregatedSeries := timeseries.AggregateBy(series, interval, aggFunc)
aggregatedSeries.Meta.Name = fmt.Sprintf("percentileAgg(%s, %v)", pInterval, percentile)
return []*timeseries.TimeSeriesData{aggregatedSeries}, nil
}
func getAggFunc(agg string) timeseries.AggFunc { func getAggFunc(agg string) timeseries.AggFunc {
switch agg { switch agg {
case "avg": case "avg":

View File

@@ -67,7 +67,7 @@ type QueryOptions struct {
// QueryOptions model // QueryOptions model
type QueryFunction struct { type QueryFunction struct {
Def QueryFunctionDef `json:"def"` Def QueryFunctionDef `json:"def"`
Params []string `json:"params"` Params []QueryFunctionParam `json:"params"`
Text string `json:"text"` Text string `json:"text"`
} }
@@ -76,7 +76,7 @@ type QueryFunctionDef struct {
Name string `json:"name"` Name string `json:"name"`
Category string `json:"category"` Category string `json:"category"`
Params []QueryFunctionParamDef `json:"params"` Params []QueryFunctionParamDef `json:"params"`
DefaultParams []interface{} `json:"defaultParams"` DefaultParams []QueryFunctionParam `json:"defaultParams"`
} }
type QueryFunctionParamDef struct { type QueryFunctionParamDef struct {
@@ -84,6 +84,8 @@ type QueryFunctionParamDef struct {
Type string `json:"type"` Type string `json:"type"`
} }
type QueryFunctionParam = interface{}
// ReadQuery will read and validate Settings from the DataSourceConfg // ReadQuery will read and validate Settings from the DataSourceConfg
func ReadQuery(query backend.DataQuery) (QueryModel, error) { func ReadQuery(query backend.DataQuery) (QueryModel, error) {
model := QueryModel{} model := QueryModel{}

View File

@@ -91,7 +91,7 @@ func (ds *ZabbixDatasourceInstance) getTrendValueType(query *QueryModel) string
for _, fn := range query.Functions { for _, fn := range query.Functions {
if fn.Def.Name == "trendValue" && len(fn.Params) > 0 { if fn.Def.Name == "trendValue" && len(fn.Params) > 0 {
trendValue = fn.Params[0] trendValue = fn.Params[0].(string)
} }
} }
@@ -103,7 +103,7 @@ func (ds *ZabbixDatasourceInstance) getConsolidateBy(query *QueryModel) string {
for _, fn := range query.Functions { for _, fn := range query.Functions {
if fn.Def.Name == "consolidateBy" && len(fn.Params) > 0 { if fn.Def.Name == "consolidateBy" && len(fn.Params) > 0 {
consolidateBy = fn.Params[0] consolidateBy = fn.Params[0].(string)
} }
} }
return consolidateBy return consolidateBy

View File

@@ -5,6 +5,8 @@ import (
"sort" "sort"
) )
type AgggregationFunc = func(points []TimePoint) *float64
func AggAvg(points []TimePoint) *float64 { func AggAvg(points []TimePoint) *float64 {
sum := AggSum(points) sum := AggSum(points)
avg := *sum / float64(len(points)) avg := *sum / float64(len(points))
@@ -63,6 +65,11 @@ func AggLast(points []TimePoint) *float64 {
} }
func AggMedian(points []TimePoint) *float64 { func AggMedian(points []TimePoint) *float64 {
return AggPercentile(50)(points)
}
func AggPercentile(n float64) AgggregationFunc {
return func(points []TimePoint) *float64 {
values := make([]float64, 0) values := make([]float64, 0)
for _, p := range points { for _, p := range points {
if p.Value != nil { if p.Value != nil {
@@ -74,7 +81,8 @@ func AggMedian(points []TimePoint) *float64 {
} }
values = sort.Float64Slice(values) values = sort.Float64Slice(values)
medianIndex := int(math.Floor(float64(len(values)) / 2)) percentileIndex := int(math.Floor(float64(len(values)) * n / 100))
median := values[medianIndex] percentile := values[percentileIndex]
return &median return &percentile
}
} }