diff --git a/pkg/datasource/response_handler.go b/pkg/datasource/response_handler.go index 576fe1f..31d080e 100644 --- a/pkg/datasource/response_handler.go +++ b/pkg/datasource/response_handler.go @@ -5,12 +5,85 @@ import ( "strconv" "time" + "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" "github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" ) -func convertHistoryToDataFrame(history zabbix.History, items zabbix.Items) *data.Frame { +func convertHistoryToTimeSeries(history zabbix.History, items []*zabbix.Item) []*timeseries.TimeSeriesData { + seriesMap := make(map[string]*timeseries.TimeSeriesData, len(items)) + + itemsMap := make(map[string]*zabbix.Item, len(items)) + for _, item := range items { + itemsMap[item.ID] = item + } + + for _, point := range history { + pointItem := itemsMap[point.ItemID] + if seriesMap[point.ItemID] == nil { + seriesMap[point.ItemID] = timeseries.NewTimeSeriesData() + } + pointSeries := seriesMap[point.ItemID] + if pointSeries.Meta.Item == nil { + pointSeries.Meta.Item = pointItem + } + + value := point.Value + pointSeries.Add(timeseries.TimePoint{ + Time: time.Unix(point.Clock, point.NS), + Value: &value, + }) + } + + series := make([]*timeseries.TimeSeriesData, 0) + for _, tsd := range seriesMap { + series = append(series, tsd) + } + + return series +} + +func convertTimeSeriesToDataFrame(series []*timeseries.TimeSeriesData) *data.Frame { + timeFileld := data.NewFieldFromFieldType(data.FieldTypeTime, 0) + timeFileld.Name = "time" + frame := data.NewFrame("History", timeFileld) + + for _, s := range series { + field := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0) + item := s.Meta.Item + if len(item.Hosts) > 0 { + field.Name = fmt.Sprintf("%s: %s", item.Hosts[0].Name, item.ExpandItemName()) + } else { + field.Name = item.ExpandItemName() + } + + frame.Fields = append(frame.Fields, field) + } + + for i, s := range series { + currentFieldIndex := i + 1 + for _, point := range s.TS { + timeFileld.Append(point.Time) + for fieldIndex, field := range frame.Fields { + if fieldIndex == currentFieldIndex { + field.Append(point.Value) + } else if fieldIndex > 0 { + field.Append(nil) + } + } + } + } + + wideFrame, err := data.LongToWide(frame, &data.FillMissing{Mode: data.FillModeNull}) + if err != nil { + backend.Logger.Debug("Error converting data frame to the wide format", "error", err) + return frame + } + return wideFrame +} + +func convertHistoryToDataFrame(history zabbix.History, items []*zabbix.Item) *data.Frame { timeFileld := data.NewFieldFromFieldType(data.FieldTypeTime, 0) timeFileld.Name = "time" frame := data.NewFrame("History", timeFileld) diff --git a/pkg/datasource/zabbix.go b/pkg/datasource/zabbix.go index 74ba844..eedab62 100644 --- a/pkg/datasource/zabbix.go +++ b/pkg/datasource/zabbix.go @@ -60,7 +60,7 @@ func (ds *ZabbixDatasourceInstance) queryNumericItems(ctx context.Context, query return frames, nil } -func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context, query *QueryModel, items zabbix.Items) (*data.Frame, error) { +func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context, query *QueryModel, items []*zabbix.Item) (*data.Frame, error) { valueType := ds.getTrendValueType(query) consolidateBy := ds.getConsolidateBy(query) @@ -73,7 +73,12 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context return nil, err } - frame := convertHistoryToDataFrame(history, items) + series := convertHistoryToTimeSeries(history, items) + // TODO: handle time series functions + + frame := convertTimeSeriesToDataFrame(series) + + // frame := convertHistoryToDataFrame(history, items) return frame, nil } @@ -100,7 +105,7 @@ func (ds *ZabbixDatasourceInstance) getConsolidateBy(query *QueryModel) string { return consolidateBy } -func (ds *ZabbixDatasourceInstance) getHistotyOrTrend(ctx context.Context, query *QueryModel, items zabbix.Items, trendValueType string) (zabbix.History, error) { +func (ds *ZabbixDatasourceInstance) getHistotyOrTrend(ctx context.Context, query *QueryModel, items []*zabbix.Item, trendValueType string) (zabbix.History, error) { timeRange := query.TimeRange useTrend := ds.isUseTrend(timeRange) diff --git a/pkg/timeseries/models.go b/pkg/timeseries/models.go index c519ba2..65b50ad 100644 --- a/pkg/timeseries/models.go +++ b/pkg/timeseries/models.go @@ -1,6 +1,10 @@ package timeseries -import "time" +import ( + "time" + + "github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix" +) type TimePoint struct { Time time.Time @@ -16,3 +20,12 @@ func NewTimeSeries() TimeSeries { func (ts *TimeSeries) Len() int { return len(*ts) } + +type TimeSeriesData struct { + TS TimeSeries + Meta TimeSeriesMeta +} + +type TimeSeriesMeta struct { + Item *zabbix.Item +} diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index 43ba3e7..f753597 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -10,6 +10,26 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" ) +func NewTimeSeriesData() *TimeSeriesData { + return &TimeSeriesData{ + TS: NewTimeSeries(), + Meta: TimeSeriesMeta{}, + } +} + +func (tsd *TimeSeriesData) Len() int { + return len(tsd.TS) +} + +func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData { + if tsd.TS == nil { + tsd.TS = NewTimeSeries() + } + + tsd.TS = append(tsd.TS, point) + return tsd +} + // Aligns point's time stamps according to provided interval. func (ts TimeSeries) Align(interval time.Duration) TimeSeries { if interval <= 0 || ts.Len() < 2 { diff --git a/pkg/zabbix/methods.go b/pkg/zabbix/methods.go index 2dea862..040382f 100644 --- a/pkg/zabbix/methods.go +++ b/pkg/zabbix/methods.go @@ -6,7 +6,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" ) -func (ds *Zabbix) GetHistory(ctx context.Context, items []Item, timeRange backend.TimeRange) (History, error) { +func (ds *Zabbix) GetHistory(ctx context.Context, items []*Item, timeRange backend.TimeRange) (History, error) { history := History{} // Zabbix stores history in different tables and `history` param required for query. So in one query it's only // possible to get history for items with one type. In order to get history for items with multple types (numeric unsigned and numeric float), @@ -49,7 +49,7 @@ func (ds *Zabbix) getHistory(ctx context.Context, itemids []string, historyType return history, err } -func (ds *Zabbix) GetTrend(ctx context.Context, items []Item, timeRange backend.TimeRange) (Trend, error) { +func (ds *Zabbix) GetTrend(ctx context.Context, items []*Item, timeRange backend.TimeRange) (Trend, error) { itemids := make([]string, 0) for _, item := range items { itemids = append(itemids, item.ID) @@ -78,7 +78,7 @@ func (ds *Zabbix) getTrend(ctx context.Context, itemids []string, timeRange back return trend, err } -func (ds *Zabbix) GetItems(ctx context.Context, groupFilter string, hostFilter string, appFilter string, itemFilter string, itemType string) ([]Item, error) { +func (ds *Zabbix) GetItems(ctx context.Context, groupFilter string, hostFilter string, appFilter string, itemFilter string, itemType string) ([]*Item, error) { hosts, err := ds.GetHosts(ctx, groupFilter, hostFilter) if err != nil { return nil, err @@ -100,7 +100,7 @@ func (ds *Zabbix) GetItems(ctx context.Context, groupFilter string, hostFilter s appids = append(appids, app.ID) } - var allItems []Item + var allItems []*Item if len(hostids) > 0 { allItems, err = ds.GetAllItems(ctx, hostids, nil, itemType) } else if len(appids) > 0 { @@ -110,13 +110,13 @@ func (ds *Zabbix) GetItems(ctx context.Context, groupFilter string, hostFilter s return filterItemsByQuery(allItems, itemFilter) } -func filterItemsByQuery(items []Item, filter string) ([]Item, error) { +func filterItemsByQuery(items []*Item, filter string) ([]*Item, error) { re, err := parseFilter(filter) if err != nil { return nil, err } - var filteredItems []Item + var filteredItems []*Item for _, i := range items { name := i.Name if re != nil { @@ -241,7 +241,7 @@ func filterGroupsByQuery(items []Group, filter string) ([]Group, error) { return filteredItems, nil } -func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []string, itemtype string) ([]Item, error) { +func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []string, itemtype string) ([]*Item, error) { params := ZabbixAPIParams{ "output": []string{"itemid", "name", "key_", "value_type", "hostid", "status", "state"}, "sortfield": "name", @@ -264,7 +264,7 @@ func (ds *Zabbix) GetAllItems(ctx context.Context, hostids []string, appids []st return nil, err } - var items []Item + var items []*Item err = convertTo(result, &items) if err != nil { return nil, err diff --git a/pkg/zabbix/utils.go b/pkg/zabbix/utils.go index 3ef2cff..db6c975 100644 --- a/pkg/zabbix/utils.go +++ b/pkg/zabbix/utils.go @@ -25,7 +25,7 @@ func (item *Item) ExpandItemName() string { return name } -func expandItems(items []Item) []Item { +func expandItems(items []*Item) []*Item { for i := 0; i < len(items); i++ { items[i].Name = items[i].ExpandItemName() }