diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go index 60fef62..20b0381 100644 --- a/pkg/datasource/functions.go +++ b/pkg/datasource/functions.go @@ -10,17 +10,36 @@ import ( const RANGE_VARIABLE_VALUE = "range_series" -var errFunctionNotSupported = func(name string) error { - return fmt.Errorf("function not supported: %s", name) +var ( + errFunctionNotSupported = func(name string) error { + return fmt.Errorf("function not supported: %s", name) + } + errParsingFunctionParam = func(err error) error { + return fmt.Errorf("failed to parse function param: %s", err) + } +) + +func MustString(p QueryFunctionParam) (string, error) { + if pStr, ok := p.(string); ok { + return pStr, nil + } + return "", fmt.Errorf("failed to convert value to string: %v", p) } -var errParsingFunctionParam = func(err error) error { - return fmt.Errorf("failed to parse function param: %s", err) +func MustFloat64(p QueryFunctionParam) (float64, error) { + if pFloat, ok := p.(float64); ok { + return pFloat, nil + } else if pStr, ok := p.(string); ok { + if pFloat, err := strconv.ParseFloat(pStr, 64); err == nil { + return pFloat, nil + } + } + return 0, fmt.Errorf("failed to convert value to float: %v", p) } -type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) +type DataProcessingFunc = func(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) -type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) +type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) var seriesFuncMap map[string]DataProcessingFunc @@ -36,8 +55,9 @@ func init() { } aggFuncMap = map[string]AggDataProcessingFunc{ - "aggregateBy": applyAggregateBy, - "sumSeries": applySumSeries, + "aggregateBy": applyAggregateBy, + "sumSeries": applySumSeries, + "percentileAgg": applyPercentileAgg, } // Functions processing on the frontend @@ -74,11 +94,14 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti return series, nil } -func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { - pInterval := params[0] - pAgg := params[1] - aggFunc := getAggFunc(pAgg) +func applyGroupBy(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) { + pInterval, err := MustString(params[0]) + pAgg, err := MustString(params[1]) + if err != nil { + return nil, errParsingFunctionParam(err) + } + aggFunc := getAggFunc(pAgg) if pInterval == RANGE_VARIABLE_VALUE { s := series.GroupByRange(aggFunc) return s, nil @@ -93,8 +116,11 @@ func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.Ti return s, nil } -func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { - pFactor := params[0] +func applyScale(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) { + pFactor, err := MustString(params[0]) + if err != nil { + return nil, errParsingFunctionParam(err) + } factor, err := strconv.ParseFloat(pFactor, 64) if err != nil { return nil, errParsingFunctionParam(err) @@ -104,9 +130,8 @@ func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.Time return series.Transform(transformFunc), nil } -func applyOffset(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { - pOffset := params[0] - offset, err := strconv.ParseFloat(pOffset, 64) +func applyOffset(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) { + offset, err := MustFloat64(params[0]) if err != nil { return nil, errParsingFunctionParam(err) } @@ -115,9 +140,13 @@ func applyOffset(series timeseries.TimeSeries, params ...string) (timeseries.Tim return series.Transform(transformFunc), nil } -func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { - pInterval := params[0] - pAgg := params[1] +func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) { + pInterval, err := MustString(params[0]) + pAgg, err := MustString(params[1]) + if err != nil { + return nil, errParsingFunctionParam(err) + } + interval, err := gtime.ParseInterval(pInterval) if err != nil { return nil, errParsingFunctionParam(err) @@ -130,12 +159,31 @@ func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([] return []*timeseries.TimeSeriesData{aggregatedSeries}, nil } -func applySumSeries(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { +func applySumSeries(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) { sum := timeseries.SumSeries(series) sum.Meta.Name = "sumSeries()" return []*timeseries.TimeSeriesData{sum}, nil } +func applyPercentileAgg(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) { + pInterval, err := MustString(params[0]) + percentile, err := MustFloat64(params[1]) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + interval, err := gtime.ParseInterval(pInterval) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + aggFunc := timeseries.AggPercentile(percentile) + aggregatedSeries := timeseries.AggregateBy(series, interval, aggFunc) + aggregatedSeries.Meta.Name = fmt.Sprintf("percentileAgg(%s, %v)", pInterval, percentile) + + return []*timeseries.TimeSeriesData{aggregatedSeries}, nil +} + func getAggFunc(agg string) timeseries.AggFunc { switch agg { case "avg": diff --git a/pkg/datasource/models.go b/pkg/datasource/models.go index 62abb3b..60bc786 100644 --- a/pkg/datasource/models.go +++ b/pkg/datasource/models.go @@ -66,9 +66,9 @@ type QueryOptions struct { // QueryOptions model type QueryFunction struct { - Def QueryFunctionDef `json:"def"` - Params []string `json:"params"` - Text string `json:"text"` + Def QueryFunctionDef `json:"def"` + Params []QueryFunctionParam `json:"params"` + Text string `json:"text"` } // QueryOptions model @@ -76,7 +76,7 @@ type QueryFunctionDef struct { Name string `json:"name"` Category string `json:"category"` Params []QueryFunctionParamDef `json:"params"` - DefaultParams []interface{} `json:"defaultParams"` + DefaultParams []QueryFunctionParam `json:"defaultParams"` } type QueryFunctionParamDef struct { @@ -84,6 +84,8 @@ type QueryFunctionParamDef struct { Type string `json:"type"` } +type QueryFunctionParam = interface{} + // ReadQuery will read and validate Settings from the DataSourceConfg func ReadQuery(query backend.DataQuery) (QueryModel, error) { model := QueryModel{} diff --git a/pkg/datasource/zabbix.go b/pkg/datasource/zabbix.go index dbbbe73..f6268c0 100644 --- a/pkg/datasource/zabbix.go +++ b/pkg/datasource/zabbix.go @@ -91,7 +91,7 @@ func (ds *ZabbixDatasourceInstance) getTrendValueType(query *QueryModel) string for _, fn := range query.Functions { if fn.Def.Name == "trendValue" && len(fn.Params) > 0 { - trendValue = fn.Params[0] + trendValue = fn.Params[0].(string) } } @@ -103,7 +103,7 @@ func (ds *ZabbixDatasourceInstance) getConsolidateBy(query *QueryModel) string { for _, fn := range query.Functions { if fn.Def.Name == "consolidateBy" && len(fn.Params) > 0 { - consolidateBy = fn.Params[0] + consolidateBy = fn.Params[0].(string) } } return consolidateBy diff --git a/pkg/timeseries/agg_functions.go b/pkg/timeseries/agg_functions.go index bc0d2a4..2d9239a 100644 --- a/pkg/timeseries/agg_functions.go +++ b/pkg/timeseries/agg_functions.go @@ -5,6 +5,8 @@ import ( "sort" ) +type AgggregationFunc = func(points []TimePoint) *float64 + func AggAvg(points []TimePoint) *float64 { sum := AggSum(points) avg := *sum / float64(len(points)) @@ -63,18 +65,24 @@ func AggLast(points []TimePoint) *float64 { } func AggMedian(points []TimePoint) *float64 { - values := make([]float64, 0) - for _, p := range points { - if p.Value != nil { - values = append(values, *p.Value) - } - } - if len(values) == 0 { - return nil - } - - values = sort.Float64Slice(values) - medianIndex := int(math.Floor(float64(len(values)) / 2)) - median := values[medianIndex] - return &median + return AggPercentile(50)(points) +} + +func AggPercentile(n float64) AgggregationFunc { + return func(points []TimePoint) *float64 { + values := make([]float64, 0) + for _, p := range points { + if p.Value != nil { + values = append(values, *p.Value) + } + } + if len(values) == 0 { + return nil + } + + values = sort.Float64Slice(values) + percentileIndex := int(math.Floor(float64(len(values)) * n / 100)) + percentile := values[percentileIndex] + return &percentile + } }