Use Data frames response format (#1099)

* Use data frames for numeric data

* Use data frames for text data

* Use data frames for IT services

* fix multiple series

* Convert to the wide format if possible

* Fix table format for text data

* Add refId to the data frames

* Align time series from Zabbix API

* Fill gaps with nulls

* Fix moving average functions

* Option for disabling data alignment

* remove unused logging

* Add labels to data frames

* Detect units

* Set min and max for if percent unit used

* Use value mapping from Zabbix

* Rename unitConverter -> convertZabbixUnit

* More units

* Add missing points in front of each series

* Fix handling table data

* fix db connector data frames handling

* fix it services data frames handling

* Detect all known grafana units

* Chore: remove unused logging

* Fix problems format

* Debug logging: show original units

* Add global option for disabling data alignment

* Add tooltip for the disableDataAlignment feature

* Add note about query options

* Functions for aligning timeseries on the backend
This commit is contained in:
Alexander Zobnin
2020-12-22 15:33:14 +03:00
committed by GitHub
parent ad378a81e1
commit 83618178f0
18 changed files with 700 additions and 91 deletions

View File

@@ -0,0 +1,49 @@
package datasource
import (
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
func convertHistory(history History, items Items) *data.Frame {
timeFileld := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFileld.Name = "time"
frame := data.NewFrame("History", timeFileld)
for _, item := range items {
field := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)
if len(item.Hosts) > 0 {
field.Name = fmt.Sprintf("%s: %s", item.Hosts[0].Name, item.ExpandItem())
} else {
field.Name = item.ExpandItem()
}
frame.Fields = append(frame.Fields, field)
}
for _, point := range history {
for columnIndex, field := range frame.Fields {
if columnIndex == 0 {
ts := time.Unix(point.Clock, point.NS)
field.Append(ts)
} else {
item := items[columnIndex-1]
if point.ItemID == item.ID {
value := point.Value
field.Append(&value)
} else {
field.Append(nil)
}
}
}
}
wideFrame, err := data.LongToWide(frame, &data.FillMissing{Mode: data.FillModeNull})
if err != nil {
backend.Logger.Debug("Error converting data frame to the wide format", "error", err)
return frame
}
return wideFrame
}

View File

@@ -374,7 +374,8 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context
return nil, err
}
return convertHistory(history, items), nil
frame := convertHistory(history, items)
return frame, nil
}
func (ds *ZabbixDatasourceInstance) getTrendValueType(query *QueryModel) string {
@@ -472,46 +473,6 @@ func (ds *ZabbixDatasourceInstance) isUseTrend(timeRange backend.TimeRange) bool
return false
}
func convertHistory(history History, items Items) *data.Frame {
timeFileld := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFileld.Name = "time"
frame := data.NewFrame("History", timeFileld)
for _, item := range items {
field := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)
if len(item.Hosts) > 0 {
field.Name = fmt.Sprintf("%s: %s", item.Hosts[0].Name, item.ExpandItem())
} else {
field.Name = item.ExpandItem()
}
frame.Fields = append(frame.Fields, field)
}
for _, point := range history {
for columnIndex, field := range frame.Fields {
if columnIndex == 0 {
ts := time.Unix(point.Clock, point.NS)
field.Append(ts)
} else {
item := items[columnIndex-1]
if point.ItemID == item.ID {
value := point.Value
field.Append(&value)
} else {
field.Append(nil)
}
}
}
}
// TODO: convert to wide format
wideFrame, err := data.LongToWide(frame, &data.FillMissing{Mode: data.FillModeNull})
if err == nil {
return wideFrame
}
return frame
}
func parseFilter(filter string) (*regexp.Regexp, error) {
regex := regexp.MustCompile(`^/(.+)/(.*)$`)
flagRE := regexp.MustCompile("[imsU]+")

18
pkg/timeseries/models.go Normal file
View File

@@ -0,0 +1,18 @@
package timeseries
import "time"
type TimePoint struct {
Time time.Time
Value *float64
}
type TimeSeries []TimePoint
func NewTimeSeries() TimeSeries {
return make(TimeSeries, 0)
}
func (ts *TimeSeries) Len() int {
return len(*ts)
}

View File

@@ -0,0 +1,153 @@
package timeseries
import (
"errors"
"math"
"sort"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// Aligns point's time stamps according to provided interval.
func (ts TimeSeries) Align(interval time.Duration) TimeSeries {
if interval <= 0 || ts.Len() < 2 {
return ts
}
alignedTs := NewTimeSeries()
var frameTs = ts[0].GetTimeFrame(interval)
var pointFrameTs time.Time
var point TimePoint
for i := 1; i < ts.Len(); i++ {
point = ts[i]
pointFrameTs = point.GetTimeFrame(interval)
if pointFrameTs.After(frameTs) {
for frameTs.Before(pointFrameTs) {
alignedTs = append(alignedTs, TimePoint{Time: frameTs, Value: nil})
frameTs = frameTs.Add(interval)
}
}
alignedTs = append(alignedTs, TimePoint{Time: pointFrameTs, Value: point.Value})
frameTs = frameTs.Add(interval)
}
return alignedTs
}
// Detects interval between data points in milliseconds based on median delta between points.
func (ts TimeSeries) DetectInterval() time.Duration {
if ts.Len() < 2 {
return 0
}
deltas := make([]int, 0)
for i := 1; i < ts.Len(); i++ {
delta := ts[i].Time.Sub(ts[i-1].Time)
deltas = append(deltas, int(delta.Milliseconds()))
}
sort.Ints(deltas)
midIndex := int(math.Floor(float64(len(deltas)) * 0.5))
return time.Duration(deltas[midIndex]) * time.Millisecond
}
// Gets point timestamp rounded according to provided interval.
func (p *TimePoint) GetTimeFrame(interval time.Duration) time.Time {
return p.Time.Truncate(interval)
}
func alignDataPoints(frame *data.Frame, interval time.Duration) *data.Frame {
if interval <= 0 || frame.Rows() < 2 {
return frame
}
timeFieldIdx := getTimeFieldIndex(frame)
if timeFieldIdx < 0 {
return frame
}
var frameTs = getPointTimeFrame(getTimestampAt(frame, 0), interval)
var pointFrameTs *time.Time
var pointsInserted = 0
for i := 1; i < frame.Rows(); i++ {
pointFrameTs = getPointTimeFrame(getTimestampAt(frame, i), interval)
if pointFrameTs == nil || frameTs == nil {
continue
}
if pointFrameTs.After(*frameTs) {
for frameTs.Before(*pointFrameTs) {
insertAt := i + pointsInserted
err := insertNullPointAt(frame, *frameTs, insertAt)
if err != nil {
backend.Logger.Debug("Error inserting null point", "error", err)
}
*frameTs = frameTs.Add(interval)
pointsInserted++
}
}
setTimeAt(frame, *pointFrameTs, i+pointsInserted)
*frameTs = frameTs.Add(interval)
}
return frame
}
func getPointTimeFrame(ts *time.Time, interval time.Duration) *time.Time {
if ts == nil {
return nil
}
timeFrame := ts.Truncate(interval)
return &timeFrame
}
func getTimeFieldIndex(frame *data.Frame) int {
for i := 0; i < len(frame.Fields); i++ {
if frame.Fields[i].Type() == data.FieldTypeTime {
return i
}
}
return -1
}
func getTimestampAt(frame *data.Frame, index int) *time.Time {
timeFieldIdx := getTimeFieldIndex(frame)
if timeFieldIdx < 0 {
return nil
}
tsValue := frame.Fields[timeFieldIdx].At(index)
ts, ok := tsValue.(time.Time)
if !ok {
return nil
}
return &ts
}
func insertNullPointAt(frame *data.Frame, frameTs time.Time, index int) error {
for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime {
field.Insert(index, frameTs)
} else if field.Type().Nullable() {
field.Insert(index, nil)
} else {
return errors.New("field is not nullable")
}
}
return nil
}
func setTimeAt(frame *data.Frame, frameTs time.Time, index int) {
for _, field := range frame.Fields {
if field.Type() == data.FieldTypeTime {
field.Insert(index, frameTs)
}
}
}