From e12b8cbefbc317fcb4c9a105f2327adf1265907e Mon Sep 17 00:00:00 2001 From: Alexander Zobnin Date: Wed, 4 Aug 2021 18:07:38 +0300 Subject: [PATCH] Handle db connection response on the backend --- pkg/datasource/models.go | 6 ++ pkg/datasource/resource_handler.go | 41 ++++++++++ pkg/datasource/zabbix.go | 38 ++++++++++ pkg/plugin.go | 1 + pkg/timeseries/models.go | 17 +++++ src/datasource-zabbix/datasource.ts | 76 +++++++++++++------ src/datasource-zabbix/responseHandler.ts | 60 ++++++++++++++- .../zabbix/connectors/dbConnector.js | 17 +++-- .../zabbix/connectors/sql/sqlConnector.js | 4 +- src/datasource-zabbix/zabbix/zabbix.ts | 6 +- 10 files changed, 233 insertions(+), 33 deletions(-) diff --git a/pkg/datasource/models.go b/pkg/datasource/models.go index 89c71fb..577d692 100644 --- a/pkg/datasource/models.go +++ b/pkg/datasource/models.go @@ -3,6 +3,7 @@ package datasource import ( "encoding/json" "fmt" + "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" "strconv" "time" @@ -44,6 +45,11 @@ type ZabbixDatasourceSettings struct { DisableReadOnlyUsersAck bool `json:"disableReadOnlyUsersAck"` } +type DBConnectionPostProcessingRequest struct { + Query QueryModel `json:"query"` + Series []*timeseries.TimeSeriesData `json:"series"` +} + type ZabbixAPIResourceRequest struct { DatasourceId int64 `json:"datasourceId"` Method string `json:"method"` diff --git a/pkg/datasource/resource_handler.go b/pkg/datasource/resource_handler.go index ca7da96..e01389f 100644 --- a/pkg/datasource/resource_handler.go +++ b/pkg/datasource/resource_handler.go @@ -60,6 +60,47 @@ func (ds *ZabbixDatasource) ZabbixAPIHandler(rw http.ResponseWriter, req *http.R writeResponse(rw, result) } +func (ds *ZabbixDatasource) DBConnectionPostProcessingHandler(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + return + } + + body, err := ioutil.ReadAll(req.Body) + defer req.Body.Close() + if err != nil || len(body) == 0 { + writeError(rw, http.StatusBadRequest, err) + return + } + + var reqData DBConnectionPostProcessingRequest + err = json.Unmarshal(body, &reqData) + if err != nil { + ds.logger.Error("Cannot unmarshal request", "error", err.Error()) + writeError(rw, http.StatusInternalServerError, err) + return + } + + pluginCxt := httpadapter.PluginConfigFromContext(req.Context()) + dsInstance, err := ds.getDSInstance(pluginCxt) + if err != nil { + ds.logger.Error("Error loading datasource", "error", err) + writeError(rw, http.StatusInternalServerError, err) + return + } + + frames, err := dsInstance.applyDataProcessing(req.Context(), &reqData.Query, reqData.Series) + + resultJson, err := json.Marshal(frames) + if err != nil { + writeError(rw, http.StatusInternalServerError, err) + } + + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusOK) + rw.Write(resultJson) + +} + func writeResponse(rw http.ResponseWriter, result *ZabbixAPIResourceResponse) { resultJson, err := json.Marshal(*result) if err != nil { diff --git a/pkg/datasource/zabbix.go b/pkg/datasource/zabbix.go index e61a0ad..6108415 100644 --- a/pkg/datasource/zabbix.go +++ b/pkg/datasource/zabbix.go @@ -1,6 +1,7 @@ package datasource import ( + "github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries" "strings" "time" @@ -134,6 +135,43 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context return frames, nil } +func (ds *ZabbixDatasourceInstance) applyDataProcessing(ctx context.Context, query *QueryModel, series []*timeseries.TimeSeriesData) ([]*data.Frame, error) { + consolidateBy := ds.getConsolidateBy(query) + + // Align time series data if possible + useTrend := ds.isUseTrend(query.TimeRange) + if !query.Options.DisableDataAlignment && !ds.Settings.DisableDataAlignment && !useTrend { + for _, s := range series { + if s.Meta.Interval != nil { + s.TS = s.TS.Align(*s.Meta.Interval) + } + } + } + + series, err := applyFunctions(series, query.Functions) + if err != nil { + return nil, err + } + + for _, s := range series { + if int64(s.Len()) > query.MaxDataPoints && query.Interval > 0 { + downsampleFunc := consolidateBy + if downsampleFunc == "" { + downsampleFunc = "avg" + } + downsampled, err := applyGroupBy(s.TS, query.Interval.String(), downsampleFunc) + if err == nil { + s.TS = downsampled + } else { + ds.logger.Debug("Error downsampling series", "error", err) + } + } + } + + frames := convertTimeSeriesToDataFrames(series) + return frames, nil +} + func (ds *ZabbixDatasourceInstance) getTrendValueType(query *QueryModel) string { trendValue := "avg" diff --git a/pkg/plugin.go b/pkg/plugin.go index 950c5af..89b278c 100644 --- a/pkg/plugin.go +++ b/pkg/plugin.go @@ -45,6 +45,7 @@ func Init(logger log.Logger, mux *http.ServeMux) *datasource.ZabbixDatasource { mux.HandleFunc("/", ds.RootHandler) mux.HandleFunc("/zabbix-api", ds.ZabbixAPIHandler) + mux.HandleFunc("/db-connection-post", ds.DBConnectionPostProcessingHandler) // mux.Handle("/scenarios", getScenariosHandler(logger)) return ds diff --git a/pkg/timeseries/models.go b/pkg/timeseries/models.go index 0a12a63..26b60c2 100644 --- a/pkg/timeseries/models.go +++ b/pkg/timeseries/models.go @@ -1,6 +1,7 @@ package timeseries import ( + "encoding/json" "time" "github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix" @@ -11,6 +12,22 @@ type TimePoint struct { Value *float64 } +func (p *TimePoint) UnmarshalJSON(data []byte) error { + point := &struct { + Time int64 + Value *float64 + }{} + + if err := json.Unmarshal(data, &point); err != nil { + return err + } + + p.Value = point.Value + p.Time = time.Unix(point.Time, 0) + + return nil +} + type TimeSeries []TimePoint func NewTimeSeries() TimeSeries { diff --git a/src/datasource-zabbix/datasource.ts b/src/datasource-zabbix/datasource.ts index c6f8013..1666110 100644 --- a/src/datasource-zabbix/datasource.ts +++ b/src/datasource-zabbix/datasource.ts @@ -15,8 +15,18 @@ import problemsHandler from './problemsHandler'; import { Zabbix } from './zabbix/zabbix'; import { ZabbixAPIError } from './zabbix/connectors/zabbix_api/zabbixAPIConnector'; import { ZabbixMetricsQuery, ZabbixDSOptions, VariableQueryTypes, ShowProblemTypes, ProblemDTO } from './types'; -import { getBackendSrv, getTemplateSrv, toDataQueryError, toDataQueryResponse } from '@grafana/runtime'; -import { DataFrame, DataQueryRequest, DataQueryResponse, DataSourceApi, DataSourceInstanceSettings, FieldType, isDataFrame, LoadingState } from '@grafana/data'; +import {BackendSrvRequest, getBackendSrv, getTemplateSrv, toDataQueryError, toDataQueryResponse} from '@grafana/runtime'; +import { + DataFrame, + dataFrameFromJSON, + DataQueryRequest, + DataQueryResponse, + DataSourceApi, + DataSourceInstanceSettings, + FieldType, + isDataFrame, + LoadingState +} from '@grafana/data'; export class ZabbixDatasource extends DataSourceApi { name: string; @@ -284,36 +294,58 @@ export class ZabbixDatasource extends DataSourceApi responseHandler.seriesToDataFrame(s, target, valueMappings)); - return dataFrames; + const frames = []; + for (const frameJSON of result) { + const frame = dataFrameFromJSON(frameJSON); + frames.push(frame); + } + return frames; + // const valueMappings = await this.zabbix.getValueMappings(); + // + // const dataFrames = (result as any).map(s => responseHandler.seriesToDataFrame(s, target, valueMappings)); + // return dataFrames; } /** * Query history for numeric items */ - queryNumericDataForItems(items, target: ZabbixMetricsQuery, timeRange, useTrends, options) { - let getHistoryPromise; + async queryNumericDataForItems(items, target: ZabbixMetricsQuery, timeRange, useTrends, options) { + let history; options.valueType = this.getTrendValueType(target); options.consolidateBy = getConsolidateBy(target) || options.valueType; const disableDataAlignment = this.disableDataAlignment || target.options?.disableDataAlignment; if (useTrends) { - getHistoryPromise = this.zabbix.getTrends(items, timeRange, options) - .then(timeseries => { - return !disableDataAlignment ? this.fillTrendTimeSeriesWithNulls(timeseries) : timeseries; - }); + history = await this.zabbix.getTrends(items, timeRange, options); + // .then(timeseries => { + // return !disableDataAlignment ? this.fillTrendTimeSeriesWithNulls(timeseries) : timeseries; + // }); } else { - getHistoryPromise = this.zabbix.getHistoryTS(items, timeRange, options) - .then(timeseries => { - return !disableDataAlignment ? this.alignTimeSeriesData(timeseries) : timeseries; - }); + history = await this.zabbix.getHistoryTS(items, timeRange, options); + // .then(timeseries => { + // return !disableDataAlignment ? this.alignTimeSeriesData(timeseries) : timeseries; + // }); } - return getHistoryPromise - .then(timeseries => this.applyDataProcessingFunctions(timeseries, target)) - .then(timeseries => downsampleSeries(timeseries, options)); + const requestOptions: BackendSrvRequest = { + url: `/api/datasources/${this.datasourceId}/resources/db-connection-post`, + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + hideFromInspector: false, + data: { + query: target, + series: history, + }, + }; + + const response: any = await getBackendSrv().fetch(requestOptions).toPromise(); + return response.data; + + // return getHistoryPromise + // .then(timeseries => this.applyDataProcessingFunctions(timeseries, target)) + // .then(timeseries => downsampleSeries(timeseries, options)); } getTrendValueType(target) { @@ -453,10 +485,10 @@ export class ZabbixDatasource extends DataSourceApi { return this.queryNumericDataForItems(items, target, timeRange, useTrends, options); - }) - .then(result => { - return result.map(s => responseHandler.seriesToDataFrame(s, target)); }); + // .then(result => { + // return (result as any).map(s => responseHandler.seriesToDataFrame(s, target)); + // }); } /** diff --git a/src/datasource-zabbix/responseHandler.ts b/src/datasource-zabbix/responseHandler.ts index 4abb3e8..55fa9b1 100644 --- a/src/datasource-zabbix/responseHandler.ts +++ b/src/datasource-zabbix/responseHandler.ts @@ -2,7 +2,19 @@ import _ from 'lodash'; import TableModel from 'grafana/app/core/table_model'; import * as c from './constants'; import * as utils from './utils'; -import { ArrayVector, DataFrame, Field, FieldType, MutableDataFrame, MutableField, TIME_SERIES_TIME_FIELD_NAME, TIME_SERIES_VALUE_FIELD_NAME } from '@grafana/data'; +import { + ArrayVector, + DataFrame, + dataFrameFromJSON, + DataFrameJSON, + Field, + FieldType, + getTimeField, + MutableDataFrame, + MutableField, + TIME_SERIES_TIME_FIELD_NAME, + TIME_SERIES_VALUE_FIELD_NAME, +} from '@grafana/data'; import { ZabbixMetricsQuery } from './types'; /** @@ -141,6 +153,51 @@ export function seriesToDataFrame(timeseries, target: ZabbixMetricsQuery, valueM return mutableFrame; } +export function dataResponseToTimeSeries(response: DataFrameJSON[], items) { + const series = []; + if (response.length === 0) { + return []; + } + + for (const frameJSON of response) { + const frame = dataFrameFromJSON(frameJSON); + const { timeField, timeIndex } = getTimeField(frame); + for (let i = 0; i < frame.fields.length; i++) { + const field = frame.fields[i]; + if (i === timeIndex || !field.values || !field.values.length) { + continue; + } + + const s = []; + for (let j = 0; j < field.values.length; j++) { + const v = field.values.get(j); + if (v !== null) { + s.push({ time: timeField.values.get(j) / 1000, value: v }); + } + } + + const itemid = field.name; + const item = _.find(items, {'itemid': itemid}); + let interval = utils.parseItemInterval(item.delay); + if (interval === 0) { + interval = null; + } + const timeSeriesData = { + ts: s, + meta: { + name: item.name, + item, + interval, + } + }; + + series.push(timeSeriesData); + } + } + + return series; +} + export function isConvertibleToWide(data: DataFrame[]): boolean { if (!data || data.length < 2) { return false; @@ -441,6 +498,7 @@ export default { handleTriggersResponse, sortTimeseries, seriesToDataFrame, + dataResponseToTimeSeries, isConvertibleToWide, convertToWide, alignFrames, diff --git a/src/datasource-zabbix/zabbix/connectors/dbConnector.js b/src/datasource-zabbix/zabbix/connectors/dbConnector.js index a2f75c1..02c7e58 100644 --- a/src/datasource-zabbix/zabbix/connectors/dbConnector.js +++ b/src/datasource-zabbix/zabbix/connectors/dbConnector.js @@ -1,5 +1,6 @@ import _ from 'lodash'; import { getDataSourceSrv } from '@grafana/runtime'; +import responseHandler from "../../responseHandler"; export const DEFAULT_QUERY_LIMIT = 10000; export const HISTORY_TO_TABLE_MAP = { @@ -92,10 +93,6 @@ export class DBConnector { getTrends() { throw new ZabbixNotImplemented('getTrends()'); } - - handleGrafanaTSResponse(history, items, addHostName = true) { - return convertGrafanaTSResponse(history, items, addHostName); - } } // Define Zabbix DB Connector exception type for non-implemented methods @@ -111,6 +108,12 @@ export class ZabbixNotImplemented { } } +export function handleDBDataSourceResponse(response, items) { + const series = responseHandler.dataResponseToTimeSeries(response, items); + // return convertGrafanaTSResponse(series, items, addHostName); + return series; +} + /** * Converts time series returned by the data source into format that Grafana expects * time_series is Array of series: @@ -121,7 +124,11 @@ export class ZabbixNotImplemented { * }] * ``` */ -function convertGrafanaTSResponse(time_series, items, addHostName) { +export function convertGrafanaTSResponse(time_series, items, addHostName) { + if (time_series.length === 0) { + return []; + } + //uniqBy is needed to deduplicate const hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid'); let grafanaSeries = _.map(_.compact(time_series), series => { diff --git a/src/datasource-zabbix/zabbix/connectors/sql/sqlConnector.js b/src/datasource-zabbix/zabbix/connectors/sql/sqlConnector.js index da48708..b87b3e6 100644 --- a/src/datasource-zabbix/zabbix/connectors/sql/sqlConnector.js +++ b/src/datasource-zabbix/zabbix/connectors/sql/sqlConnector.js @@ -109,7 +109,7 @@ export class SQLConnector extends DBConnector { }; return getBackendSrv().datasourceRequest({ - url: '/api/tsdb/query', + url: '/api/ds/query', method: 'POST', data: { queries: [queryDef], @@ -118,7 +118,7 @@ export class SQLConnector extends DBConnector { .then(response => { let results = response.data.results; if (results['A']) { - return results['A'].series; + return results['A'].frames; } else { return null; } diff --git a/src/datasource-zabbix/zabbix/zabbix.ts b/src/datasource-zabbix/zabbix/zabbix.ts index 3928585..c43791e 100644 --- a/src/datasource-zabbix/zabbix/zabbix.ts +++ b/src/datasource-zabbix/zabbix/zabbix.ts @@ -5,7 +5,7 @@ import * as utils from '../utils'; import responseHandler from '../responseHandler'; import { CachingProxy } from './proxy/cachingProxy'; // import { ZabbixNotImplemented } from './connectors/dbConnector'; -import { DBConnector } from './connectors/dbConnector'; +import { DBConnector, handleDBDataSourceResponse } from './connectors/dbConnector'; import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector'; import { SQLConnector } from './connectors/sql/sqlConnector'; import { InfluxDBConnector } from './connectors/influxdb/influxdbConnector'; @@ -432,7 +432,7 @@ export class Zabbix implements ZabbixConnector { const [timeFrom, timeTo] = timeRange; if (this.enableDirectDBConnection) { return this.getHistoryDB(items, timeFrom, timeTo, options) - .then(history => this.dbConnector.handleGrafanaTSResponse(history, items)); + .then(history => handleDBDataSourceResponse(history, items)); } else { return this.zabbixAPI.getHistory(items, timeFrom, timeTo) .then(history => responseHandler.handleHistory(history, items)); @@ -443,7 +443,7 @@ export class Zabbix implements ZabbixConnector { const [timeFrom, timeTo] = timeRange; if (this.enableDirectDBConnection) { return this.getTrendsDB(items, timeFrom, timeTo, options) - .then(history => this.dbConnector.handleGrafanaTSResponse(history, items)); + .then(history => handleDBDataSourceResponse(history, items)); } else { const valueType = options.consolidateBy || options.valueType; return this.zabbixAPI.getTrend(items, timeFrom, timeTo)