initial pipeline for functions processing
This commit is contained in:
@@ -122,8 +122,6 @@ func (ds *ZabbixDatasource) QueryData(ctx context.Context, req *backend.QueryDat
|
|||||||
ds.logger.Debug("DS query", "query", q)
|
ds.logger.Debug("DS query", "query", q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
res.Error = err
|
res.Error = err
|
||||||
} else if len(query.Functions) > 0 {
|
|
||||||
res.Error = ErrFunctionsNotSupported
|
|
||||||
} else if query.Mode != 0 {
|
} else if query.Mode != 0 {
|
||||||
res.Error = ErrNonMetricQueryNotSupported
|
res.Error = ErrNonMetricQueryNotSupported
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
40
pkg/datasource/functions.go
Normal file
40
pkg/datasource/functions.go
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
package datasource
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errFunctionNotSupported = func(name string) error {
|
||||||
|
return fmt.Errorf("function not supported: %s", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries
|
||||||
|
|
||||||
|
var funcMap map[string]DataProcessingFunc
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
funcMap = make(map[string]DataProcessingFunc)
|
||||||
|
funcMap["groupBy"] = applyGroupBy
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) {
|
||||||
|
for _, f := range functions {
|
||||||
|
if applyFunc, ok := funcMap[f.Def.Name]; ok {
|
||||||
|
for _, s := range series {
|
||||||
|
s.TS = applyFunc(s.TS, f.Params...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err := errFunctionNotSupported(f.Def.Name)
|
||||||
|
return series, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return series, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyGroupBy(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries {
|
||||||
|
s := series.GroupBy(time.Minute, "avg")
|
||||||
|
return s
|
||||||
|
}
|
||||||
@@ -75,6 +75,10 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context
|
|||||||
|
|
||||||
series := convertHistoryToTimeSeries(history, items)
|
series := convertHistoryToTimeSeries(history, items)
|
||||||
// TODO: handle time series functions
|
// TODO: handle time series functions
|
||||||
|
series, err = applyFunctions(series, query.Functions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
frame := convertTimeSeriesToDataFrame(series)
|
frame := convertTimeSeriesToDataFrame(series)
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,10 @@ func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData {
|
|||||||
return tsd
|
return tsd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ts TimeSeries) GroupBy(interval time.Duration, agg string) TimeSeries {
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
|
||||||
// Aligns point's time stamps according to provided interval.
|
// Aligns point's time stamps according to provided interval.
|
||||||
func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
|
func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
|
||||||
if interval <= 0 || ts.Len() < 2 {
|
if interval <= 0 || ts.Len() < 2 {
|
||||||
|
|||||||
Reference in New Issue
Block a user