Align time series data if possible

This commit is contained in:
Alexander Zobnin
2021-05-31 17:42:24 +03:00
parent 4f7699442e
commit 7d5b7cad3e
8 changed files with 99 additions and 109 deletions

View File

@@ -16,6 +16,7 @@ type ZabbixDatasourceSettingsDTO struct {
CacheTTL string `json:"cacheTTL"` CacheTTL string `json:"cacheTTL"`
Timeout string `json:"timeout"` Timeout string `json:"timeout"`
DisableDataAlignment bool `json:"disableDataAlignment"`
DisableReadOnlyUsersAck bool `json:"disableReadOnlyUsersAck"` DisableReadOnlyUsersAck bool `json:"disableReadOnlyUsersAck"`
} }
@@ -27,6 +28,7 @@ type ZabbixDatasourceSettings struct {
CacheTTL time.Duration CacheTTL time.Duration
Timeout time.Duration Timeout time.Duration
DisableDataAlignment bool `json:"disableDataAlignment"`
DisableReadOnlyUsersAck bool `json:"disableReadOnlyUsersAck"` DisableReadOnlyUsersAck bool `json:"disableReadOnlyUsersAck"`
} }
@@ -61,7 +63,8 @@ type QueryFilter struct {
// QueryOptions model // QueryOptions model
type QueryOptions struct { type QueryOptions struct {
ShowDisabledItems bool `json:"showDisabledItems"` ShowDisabledItems bool `json:"showDisabledItems"`
DisableDataAlignment bool `json:"disableDataAlignment"`
} }
// QueryOptions model // QueryOptions model

View File

@@ -2,9 +2,11 @@ package datasource
import ( import (
"fmt" "fmt"
"regexp"
"strconv" "strconv"
"time" "time"
"github.com/alexanderzobnin/grafana-zabbix/pkg/gtime"
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
"github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix" "github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -33,6 +35,7 @@ func convertHistoryToTimeSeries(history zabbix.History, items []*zabbix.Item) []
if len(pointItem.Hosts) > 0 { if len(pointItem.Hosts) > 0 {
pointSeries.Meta.Name = fmt.Sprintf("%s: %s", pointItem.Hosts[0].Name, itemName) pointSeries.Meta.Name = fmt.Sprintf("%s: %s", pointItem.Hosts[0].Name, itemName)
} }
pointSeries.Meta.Interval = parseItemUpdateInterval(pointItem.Delay)
} }
value := point.Value value := point.Value
@@ -209,3 +212,18 @@ func getTrendPointValue(point zabbix.TrendPoint, valueType string) (float64, err
return 0, fmt.Errorf("failed to get trend value, unknown value type: %s", valueType) return 0, fmt.Errorf("failed to get trend value, unknown value type: %s", valueType)
} }
var fixedUpdateIntervalPattern = regexp.MustCompile(`^(\d+)([shd]?)$`)
func parseItemUpdateInterval(delay string) *time.Duration {
if valid := fixedUpdateIntervalPattern.MatchString(delay); !valid {
return nil
}
interval, err := gtime.ParseInterval(delay)
if err != nil {
return nil
}
return &interval
}

View File

@@ -79,6 +79,16 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context
} }
series := convertHistoryToTimeSeries(history, items) series := convertHistoryToTimeSeries(history, items)
// Align time series data if possible
if query.Options.DisableDataAlignment == false && ds.Settings.DisableDataAlignment == false {
for _, s := range series {
if s.Meta.Interval != nil {
s.TS = s.TS.Align(*s.Meta.Interval)
}
}
}
series, err = applyFunctions(series, query.Functions) series, err = applyFunctions(series, query.Functions)
if err != nil { if err != nil {
return nil, err return nil, err

52
pkg/timeseries/align.go Normal file
View File

@@ -0,0 +1,52 @@
package timeseries
import (
"math"
"sort"
"time"
)
// Aligns point's time stamps according to provided interval.
func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
if interval <= 0 || ts.Len() < 2 {
return ts
}
alignedTs := NewTimeSeries()
var frameTs = ts[0].GetTimeFrame(interval)
var pointFrameTs time.Time
var point TimePoint
for i := 0; i < ts.Len(); i++ {
point = ts[i]
pointFrameTs = point.GetTimeFrame(interval)
if pointFrameTs.After(frameTs) {
for frameTs.Before(pointFrameTs) {
alignedTs = append(alignedTs, TimePoint{Time: frameTs, Value: nil})
frameTs = frameTs.Add(interval)
}
}
alignedTs = append(alignedTs, TimePoint{Time: pointFrameTs, Value: point.Value})
frameTs = frameTs.Add(interval)
}
return alignedTs
}
// Detects interval between data points in milliseconds based on median delta between points.
func (ts TimeSeries) DetectInterval() time.Duration {
if ts.Len() < 2 {
return 0
}
deltas := make([]int, 0)
for i := 1; i < ts.Len(); i++ {
delta := ts[i].Time.Sub(ts[i-1].Time)
deltas = append(deltas, int(delta.Milliseconds()))
}
sort.Ints(deltas)
midIndex := int(math.Floor(float64(len(deltas)) * 0.5))
return time.Duration(deltas[midIndex]) * time.Millisecond
}

View File

@@ -29,6 +29,9 @@ type TimeSeriesData struct {
type TimeSeriesMeta struct { type TimeSeriesMeta struct {
Name string Name string
Item *zabbix.Item Item *zabbix.Item
// Item update interval. nil means not supported intervals (flexible, schedule, etc)
Interval *time.Duration
} }
type AggFunc = func(points []TimePoint) *float64 type AggFunc = func(points []TimePoint) *float64

View File

@@ -1,12 +1,9 @@
package timeseries package timeseries
import ( import (
"errors"
"math"
"sort" "sort"
"time" "time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
) )
@@ -286,94 +283,11 @@ func findNearestLeft(series TimeSeries, pointIndex int) *TimePoint {
return nil return nil
} }
// Aligns point's time stamps according to provided interval.
func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
if interval <= 0 || ts.Len() < 2 {
return ts
}
alignedTs := NewTimeSeries()
var frameTs = ts[0].GetTimeFrame(interval)
var pointFrameTs time.Time
var point TimePoint
for i := 1; i < ts.Len(); i++ {
point = ts[i]
pointFrameTs = point.GetTimeFrame(interval)
if pointFrameTs.After(frameTs) {
for frameTs.Before(pointFrameTs) {
alignedTs = append(alignedTs, TimePoint{Time: frameTs, Value: nil})
frameTs = frameTs.Add(interval)
}
}
alignedTs = append(alignedTs, TimePoint{Time: pointFrameTs, Value: point.Value})
frameTs = frameTs.Add(interval)
}
return alignedTs
}
// Detects interval between data points in milliseconds based on median delta between points.
func (ts TimeSeries) DetectInterval() time.Duration {
if ts.Len() < 2 {
return 0
}
deltas := make([]int, 0)
for i := 1; i < ts.Len(); i++ {
delta := ts[i].Time.Sub(ts[i-1].Time)
deltas = append(deltas, int(delta.Milliseconds()))
}
sort.Ints(deltas)
midIndex := int(math.Floor(float64(len(deltas)) * 0.5))
return time.Duration(deltas[midIndex]) * time.Millisecond
}
// Gets point timestamp rounded according to provided interval. // Gets point timestamp rounded according to provided interval.
func (p *TimePoint) GetTimeFrame(interval time.Duration) time.Time { func (p *TimePoint) GetTimeFrame(interval time.Duration) time.Time {
return p.Time.Truncate(interval) return p.Time.Truncate(interval)
} }
func alignDataPoints(frame *data.Frame, interval time.Duration) *data.Frame {
if interval <= 0 || frame.Rows() < 2 {
return frame
}
timeFieldIdx := getTimeFieldIndex(frame)
if timeFieldIdx < 0 {
return frame
}
var frameTs = getPointTimeFrame(getTimestampAt(frame, 0), interval)
var pointFrameTs *time.Time
var pointsInserted = 0
for i := 1; i < frame.Rows(); i++ {
pointFrameTs = getPointTimeFrame(getTimestampAt(frame, i), interval)
if pointFrameTs == nil || frameTs == nil {
continue
}
if pointFrameTs.After(*frameTs) {
for frameTs.Before(*pointFrameTs) {
insertAt := i + pointsInserted
err := insertNullPointAt(frame, *frameTs, insertAt)
if err != nil {
backend.Logger.Debug("Error inserting null point", "error", err)
}
*frameTs = frameTs.Add(interval)
pointsInserted++
}
}
setTimeAt(frame, *pointFrameTs, i+pointsInserted)
*frameTs = frameTs.Add(interval)
}
return frame
}
func getPointTimeFrame(ts *time.Time, interval time.Duration) *time.Time { func getPointTimeFrame(ts *time.Time, interval time.Duration) *time.Time {
if ts == nil { if ts == nil {
return nil return nil
@@ -407,19 +321,6 @@ func getTimestampAt(frame *data.Frame, index int) *time.Time {
return &ts return &ts
} }
func insertNullPointAt(frame *data.Frame, frameTs time.Time, index int) error {
for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime {
field.Insert(index, frameTs)
} else if field.Type().Nullable() {
field.Insert(index, nil)
} else {
return errors.New("field is not nullable")
}
}
return nil
}
func setTimeAt(frame *data.Frame, frameTs time.Time, index int) { func setTimeAt(frame *data.Frame, frameTs time.Time, index int) {
for _, field := range frame.Fields { for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime { if field.Type() == data.FieldTypeTime {

View File

@@ -243,7 +243,7 @@ func filterGroupsByQuery(items []Group, filter string) ([]Group, error) {
func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []string, itemtype string) ([]*Item, error) { func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []string, itemtype string) ([]*Item, error) {
params := ZabbixAPIParams{ params := ZabbixAPIParams{
"output": []string{"itemid", "name", "key_", "value_type", "hostid", "status", "state"}, "output": []string{"itemid", "name", "key_", "value_type", "hostid", "status", "state", "units", "valuemapid", "delay"},
"sortfield": "name", "sortfield": "name",
"webitems": true, "webitems": true,
"filter": map[string]interface{}{}, "filter": map[string]interface{}{},

View File

@@ -40,14 +40,17 @@ func (r *ZabbixAPIRequest) String() string {
type Items []Item type Items []Item
type Item struct { type Item struct {
ID string `json:"itemid,omitempty"` ID string `json:"itemid,omitempty"`
Key string `json:"key_,omitempty"` Key string `json:"key_,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
ValueType int `json:"value_type,omitempty,string"` ValueType int `json:"value_type,omitempty,string"`
HostID string `json:"hostid,omitempty"` HostID string `json:"hostid,omitempty"`
Hosts []ItemHost `json:"hosts,omitempty"` Hosts []ItemHost `json:"hosts,omitempty"`
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
State string `json:"state,omitempty"` State string `json:"state,omitempty"`
Delay string `json:"delay,omitempty"`
Units string `json:"units,omitempty"`
ValueMapID string `json:"valuemapid,omitempty"`
} }
type ItemHost struct { type ItemHost struct {