From 3f7cabcd53e6a72f74713e28387f445cf1746846 Mon Sep 17 00:00:00 2001 From: Alexander Zobnin Date: Mon, 24 May 2021 17:46:49 +0300 Subject: [PATCH] initial pipeline for functions processing --- pkg/datasource/datasource.go | 2 -- pkg/datasource/functions.go | 40 ++++++++++++++++++++++++++++++++++++ pkg/datasource/zabbix.go | 4 ++++ pkg/timeseries/timeseries.go | 4 ++++ 4 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 pkg/datasource/functions.go diff --git a/pkg/datasource/datasource.go b/pkg/datasource/datasource.go index 3715ed6..f35ba60 100644 --- a/pkg/datasource/datasource.go +++ b/pkg/datasource/datasource.go @@ -122,8 +122,6 @@ func (ds *ZabbixDatasource) QueryData(ctx context.Context, req *backend.QueryDat ds.logger.Debug("DS query", "query", q) if err != nil { res.Error = err - } else if len(query.Functions) > 0 { - res.Error = ErrFunctionsNotSupported } else if query.Mode != 0 { res.Error = ErrNonMetricQueryNotSupported } else { diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go new file mode 100644 index 0000000..62074de --- /dev/null +++ b/pkg/datasource/functions.go @@ -0,0 +1,40 @@ +package datasource + +import ( + "fmt" + "time" + + "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" +) + +var errFunctionNotSupported = func(name string) error { + return fmt.Errorf("function not supported: %s", name) +} + +type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries + +var funcMap map[string]DataProcessingFunc + +func init() { + funcMap = make(map[string]DataProcessingFunc) + funcMap["groupBy"] = applyGroupBy +} + +func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) { + for _, f := range functions { + if applyFunc, ok := funcMap[f.Def.Name]; ok { + for _, s := range series { + s.TS = applyFunc(s.TS, f.Params...) + } + } else { + err := errFunctionNotSupported(f.Def.Name) + return series, err + } + } + return series, nil +} + +func applyGroupBy(series timeseries.TimeSeries, params ...string) timeseries.TimeSeries { + s := series.GroupBy(time.Minute, "avg") + return s +} diff --git a/pkg/datasource/zabbix.go b/pkg/datasource/zabbix.go index eedab62..dbbbe73 100644 --- a/pkg/datasource/zabbix.go +++ b/pkg/datasource/zabbix.go @@ -75,6 +75,10 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context series := convertHistoryToTimeSeries(history, items) // TODO: handle time series functions + series, err = applyFunctions(series, query.Functions) + if err != nil { + return nil, err + } frame := convertTimeSeriesToDataFrame(series) diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index f753597..a0e58d5 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -30,6 +30,10 @@ func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData { return tsd } +func (ts TimeSeries) GroupBy(interval time.Duration, agg string) TimeSeries { + return ts +} + // Aligns point's time stamps according to provided interval. func (ts TimeSeries) Align(interval time.Duration) TimeSeries { if interval <= 0 || ts.Len() < 2 {