Implement timeShift
This commit is contained in:
@@ -3,9 +3,11 @@ package datasource
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/alexanderzobnin/grafana-zabbix/pkg/gtime"
|
"github.com/alexanderzobnin/grafana-zabbix/pkg/gtime"
|
||||||
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
|
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
|
||||||
|
"github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix"
|
||||||
)
|
)
|
||||||
|
|
||||||
const RANGE_VARIABLE_VALUE = "range_series"
|
const RANGE_VARIABLE_VALUE = "range_series"
|
||||||
@@ -41,12 +43,16 @@ type DataProcessingFunc = func(series timeseries.TimeSeries, params ...interface
|
|||||||
|
|
||||||
type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error)
|
type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error)
|
||||||
|
|
||||||
|
type PreProcessingFunc = func(query *QueryModel, items []*zabbix.Item, params ...interface{}) error
|
||||||
|
|
||||||
var seriesFuncMap map[string]DataProcessingFunc
|
var seriesFuncMap map[string]DataProcessingFunc
|
||||||
|
|
||||||
var aggFuncMap map[string]AggDataProcessingFunc
|
var aggFuncMap map[string]AggDataProcessingFunc
|
||||||
|
|
||||||
var filterFuncMap map[string]AggDataProcessingFunc
|
var filterFuncMap map[string]AggDataProcessingFunc
|
||||||
|
|
||||||
|
var timeFuncMap map[string]PreProcessingFunc
|
||||||
|
|
||||||
var frontendFuncMap map[string]bool
|
var frontendFuncMap map[string]bool
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -55,6 +61,7 @@ func init() {
|
|||||||
"scale": applyScale,
|
"scale": applyScale,
|
||||||
"offset": applyOffset,
|
"offset": applyOffset,
|
||||||
"percentile": applyPercentile,
|
"percentile": applyPercentile,
|
||||||
|
"timeShift": applyTimeShiftPost,
|
||||||
}
|
}
|
||||||
|
|
||||||
aggFuncMap = map[string]AggDataProcessingFunc{
|
aggFuncMap = map[string]AggDataProcessingFunc{
|
||||||
@@ -69,6 +76,10 @@ func init() {
|
|||||||
"sortSeries": applySortSeries,
|
"sortSeries": applySortSeries,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timeFuncMap = map[string]PreProcessingFunc{
|
||||||
|
"timeShift": applyTimeShiftPre,
|
||||||
|
}
|
||||||
|
|
||||||
// Functions processing on the frontend
|
// Functions processing on the frontend
|
||||||
frontendFuncMap = map[string]bool{
|
frontendFuncMap = map[string]bool{
|
||||||
"setAlias": true,
|
"setAlias": true,
|
||||||
@@ -109,6 +120,20 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti
|
|||||||
return series, nil
|
return series, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// applyFunctionsPre applies functions requires pre-processing, like timeShift() (it needs to change original time range)
|
||||||
|
func applyFunctionsPre(query *QueryModel, items []*zabbix.Item) error {
|
||||||
|
for _, f := range query.Functions {
|
||||||
|
if applyFunc, ok := timeFuncMap[f.Def.Name]; ok {
|
||||||
|
err := applyFunc(query, items, f.Params...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func applyGroupBy(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) {
|
func applyGroupBy(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) {
|
||||||
pInterval, err := MustString(params[0])
|
pInterval, err := MustString(params[0])
|
||||||
pAgg, err := MustString(params[1])
|
pAgg, err := MustString(params[1])
|
||||||
@@ -256,6 +281,58 @@ func applySortSeries(series []*timeseries.TimeSeriesData, params ...interface{})
|
|||||||
return sorted, nil
|
return sorted, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func applyTimeShiftPre(query *QueryModel, items []*zabbix.Item, params ...interface{}) error {
|
||||||
|
pInterval, err := MustString(params[0])
|
||||||
|
if err != nil {
|
||||||
|
return errParsingFunctionParam(err)
|
||||||
|
}
|
||||||
|
shiftForward := false
|
||||||
|
pInterval = strings.TrimPrefix(pInterval, "-")
|
||||||
|
if strings.Index(pInterval, "+") == 0 {
|
||||||
|
pInterval = strings.TrimPrefix(pInterval, "+")
|
||||||
|
shiftForward = true
|
||||||
|
}
|
||||||
|
|
||||||
|
interval, err := gtime.ParseInterval(pInterval)
|
||||||
|
if err != nil {
|
||||||
|
return errParsingFunctionParam(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if shiftForward {
|
||||||
|
query.TimeRange.From = query.TimeRange.From.Add(interval)
|
||||||
|
query.TimeRange.To = query.TimeRange.To.Add(interval)
|
||||||
|
} else {
|
||||||
|
query.TimeRange.From = query.TimeRange.From.Add(-interval)
|
||||||
|
query.TimeRange.To = query.TimeRange.To.Add(-interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyTimeShiftPost(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) {
|
||||||
|
pInterval, err := MustString(params[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, errParsingFunctionParam(err)
|
||||||
|
}
|
||||||
|
shiftForward := false
|
||||||
|
pInterval = strings.TrimPrefix(pInterval, "-")
|
||||||
|
if strings.Index(pInterval, "+") == 0 {
|
||||||
|
pInterval = strings.TrimPrefix(pInterval, "+")
|
||||||
|
shiftForward = true
|
||||||
|
}
|
||||||
|
|
||||||
|
interval, err := gtime.ParseInterval(pInterval)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errParsingFunctionParam(err)
|
||||||
|
}
|
||||||
|
if shiftForward == true {
|
||||||
|
interval = -interval
|
||||||
|
}
|
||||||
|
|
||||||
|
transformFunc := timeseries.TransformShiftTime(interval)
|
||||||
|
return series.Transform(transformFunc), nil
|
||||||
|
}
|
||||||
|
|
||||||
func getAggFunc(agg string) timeseries.AggFunc {
|
func getAggFunc(agg string) timeseries.AggFunc {
|
||||||
switch agg {
|
switch agg {
|
||||||
case "avg":
|
case "avg":
|
||||||
|
|||||||
@@ -68,6 +68,11 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context
|
|||||||
consolidateBy = valueType
|
consolidateBy = valueType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := applyFunctionsPre(query, items)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
history, err := ds.getHistotyOrTrend(ctx, query, items, valueType)
|
history, err := ds.getHistotyOrTrend(ctx, query, items, valueType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
package timeseries
|
package timeseries
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
func TransformScale(factor float64) TransformFunc {
|
func TransformScale(factor float64) TransformFunc {
|
||||||
return func(point TimePoint) TimePoint {
|
return func(point TimePoint) TimePoint {
|
||||||
return transformScale(point, factor)
|
return transformScale(point, factor)
|
||||||
@@ -12,6 +14,12 @@ func TransformOffset(offset float64) TransformFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TransformShiftTime(interval time.Duration) TransformFunc {
|
||||||
|
return func(point TimePoint) TimePoint {
|
||||||
|
return transformShiftTime(point, interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func transformScale(point TimePoint, factor float64) TimePoint {
|
func transformScale(point TimePoint, factor float64) TimePoint {
|
||||||
if point.Value != nil {
|
if point.Value != nil {
|
||||||
newValue := *point.Value * factor
|
newValue := *point.Value * factor
|
||||||
@@ -27,3 +35,9 @@ func transformOffset(point TimePoint, offset float64) TimePoint {
|
|||||||
}
|
}
|
||||||
return point
|
return point
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func transformShiftTime(point TimePoint, interval time.Duration) TimePoint {
|
||||||
|
shiftedTime := point.Time.Add(interval)
|
||||||
|
point.Time = shiftedTime
|
||||||
|
return point
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user