Handle db connection response on the backend

This commit is contained in:
Alexander Zobnin
2021-08-04 18:07:38 +03:00
parent 848ea8a9a0
commit e12b8cbefb
10 changed files with 233 additions and 33 deletions

View File

@@ -3,6 +3,7 @@ package datasource
import (
"encoding/json"
"fmt"
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
"strconv"
"time"
@@ -44,6 +45,11 @@ type ZabbixDatasourceSettings struct {
DisableReadOnlyUsersAck bool `json:"disableReadOnlyUsersAck"`
}
type DBConnectionPostProcessingRequest struct {
Query QueryModel `json:"query"`
Series []*timeseries.TimeSeriesData `json:"series"`
}
type ZabbixAPIResourceRequest struct {
DatasourceId int64 `json:"datasourceId"`
Method string `json:"method"`

View File

@@ -60,6 +60,47 @@ func (ds *ZabbixDatasource) ZabbixAPIHandler(rw http.ResponseWriter, req *http.R
writeResponse(rw, result)
}
func (ds *ZabbixDatasource) DBConnectionPostProcessingHandler(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
return
}
body, err := ioutil.ReadAll(req.Body)
defer req.Body.Close()
if err != nil || len(body) == 0 {
writeError(rw, http.StatusBadRequest, err)
return
}
var reqData DBConnectionPostProcessingRequest
err = json.Unmarshal(body, &reqData)
if err != nil {
ds.logger.Error("Cannot unmarshal request", "error", err.Error())
writeError(rw, http.StatusInternalServerError, err)
return
}
pluginCxt := httpadapter.PluginConfigFromContext(req.Context())
dsInstance, err := ds.getDSInstance(pluginCxt)
if err != nil {
ds.logger.Error("Error loading datasource", "error", err)
writeError(rw, http.StatusInternalServerError, err)
return
}
frames, err := dsInstance.applyDataProcessing(req.Context(), &reqData.Query, reqData.Series)
resultJson, err := json.Marshal(frames)
if err != nil {
writeError(rw, http.StatusInternalServerError, err)
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
rw.Write(resultJson)
}
func writeResponse(rw http.ResponseWriter, result *ZabbixAPIResourceResponse) {
resultJson, err := json.Marshal(*result)
if err != nil {

View File

@@ -1,6 +1,7 @@
package datasource
import (
"github.com/alexanderzobnin/grafana-zabbix/pkg/timeseries"
"strings"
"time"
@@ -134,6 +135,43 @@ func (ds *ZabbixDatasourceInstance) queryNumericDataForItems(ctx context.Context
return frames, nil
}
func (ds *ZabbixDatasourceInstance) applyDataProcessing(ctx context.Context, query *QueryModel, series []*timeseries.TimeSeriesData) ([]*data.Frame, error) {
consolidateBy := ds.getConsolidateBy(query)
// Align time series data if possible
useTrend := ds.isUseTrend(query.TimeRange)
if !query.Options.DisableDataAlignment && !ds.Settings.DisableDataAlignment && !useTrend {
for _, s := range series {
if s.Meta.Interval != nil {
s.TS = s.TS.Align(*s.Meta.Interval)
}
}
}
series, err := applyFunctions(series, query.Functions)
if err != nil {
return nil, err
}
for _, s := range series {
if int64(s.Len()) > query.MaxDataPoints && query.Interval > 0 {
downsampleFunc := consolidateBy
if downsampleFunc == "" {
downsampleFunc = "avg"
}
downsampled, err := applyGroupBy(s.TS, query.Interval.String(), downsampleFunc)
if err == nil {
s.TS = downsampled
} else {
ds.logger.Debug("Error downsampling series", "error", err)
}
}
}
frames := convertTimeSeriesToDataFrames(series)
return frames, nil
}
func (ds *ZabbixDatasourceInstance) getTrendValueType(query *QueryModel) string {
trendValue := "avg"

View File

@@ -45,6 +45,7 @@ func Init(logger log.Logger, mux *http.ServeMux) *datasource.ZabbixDatasource {
mux.HandleFunc("/", ds.RootHandler)
mux.HandleFunc("/zabbix-api", ds.ZabbixAPIHandler)
mux.HandleFunc("/db-connection-post", ds.DBConnectionPostProcessingHandler)
// mux.Handle("/scenarios", getScenariosHandler(logger))
return ds

View File

@@ -1,6 +1,7 @@
package timeseries
import (
"encoding/json"
"time"
"github.com/alexanderzobnin/grafana-zabbix/pkg/zabbix"
@@ -11,6 +12,22 @@ type TimePoint struct {
Value *float64
}
func (p *TimePoint) UnmarshalJSON(data []byte) error {
point := &struct {
Time int64
Value *float64
}{}
if err := json.Unmarshal(data, &point); err != nil {
return err
}
p.Value = point.Value
p.Time = time.Unix(point.Time, 0)
return nil
}
type TimeSeries []TimePoint
func NewTimeSeries() TimeSeries {

View File

@@ -15,8 +15,18 @@ import problemsHandler from './problemsHandler';
import { Zabbix } from './zabbix/zabbix';
import { ZabbixAPIError } from './zabbix/connectors/zabbix_api/zabbixAPIConnector';
import { ZabbixMetricsQuery, ZabbixDSOptions, VariableQueryTypes, ShowProblemTypes, ProblemDTO } from './types';
import { getBackendSrv, getTemplateSrv, toDataQueryError, toDataQueryResponse } from '@grafana/runtime';
import { DataFrame, DataQueryRequest, DataQueryResponse, DataSourceApi, DataSourceInstanceSettings, FieldType, isDataFrame, LoadingState } from '@grafana/data';
import {BackendSrvRequest, getBackendSrv, getTemplateSrv, toDataQueryError, toDataQueryResponse} from '@grafana/runtime';
import {
DataFrame,
dataFrameFromJSON,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceInstanceSettings,
FieldType,
isDataFrame,
LoadingState
} from '@grafana/data';
export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDSOptions> {
name: string;
@@ -284,36 +294,58 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
console.log(`Datasource::Performance Query Time (${this.name}): ${queryEnd - queryStart}`);
}
const valueMappings = await this.zabbix.getValueMappings();
const dataFrames = result.map(s => responseHandler.seriesToDataFrame(s, target, valueMappings));
return dataFrames;
const frames = [];
for (const frameJSON of result) {
const frame = dataFrameFromJSON(frameJSON);
frames.push(frame);
}
return frames;
// const valueMappings = await this.zabbix.getValueMappings();
//
// const dataFrames = (result as any).map(s => responseHandler.seriesToDataFrame(s, target, valueMappings));
// return dataFrames;
}
/**
* Query history for numeric items
*/
queryNumericDataForItems(items, target: ZabbixMetricsQuery, timeRange, useTrends, options) {
let getHistoryPromise;
async queryNumericDataForItems(items, target: ZabbixMetricsQuery, timeRange, useTrends, options) {
let history;
options.valueType = this.getTrendValueType(target);
options.consolidateBy = getConsolidateBy(target) || options.valueType;
const disableDataAlignment = this.disableDataAlignment || target.options?.disableDataAlignment;
if (useTrends) {
getHistoryPromise = this.zabbix.getTrends(items, timeRange, options)
.then(timeseries => {
return !disableDataAlignment ? this.fillTrendTimeSeriesWithNulls(timeseries) : timeseries;
});
history = await this.zabbix.getTrends(items, timeRange, options);
// .then(timeseries => {
// return !disableDataAlignment ? this.fillTrendTimeSeriesWithNulls(timeseries) : timeseries;
// });
} else {
getHistoryPromise = this.zabbix.getHistoryTS(items, timeRange, options)
.then(timeseries => {
return !disableDataAlignment ? this.alignTimeSeriesData(timeseries) : timeseries;
});
history = await this.zabbix.getHistoryTS(items, timeRange, options);
// .then(timeseries => {
// return !disableDataAlignment ? this.alignTimeSeriesData(timeseries) : timeseries;
// });
}
return getHistoryPromise
.then(timeseries => this.applyDataProcessingFunctions(timeseries, target))
.then(timeseries => downsampleSeries(timeseries, options));
const requestOptions: BackendSrvRequest = {
url: `/api/datasources/${this.datasourceId}/resources/db-connection-post`,
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
hideFromInspector: false,
data: {
query: target,
series: history,
},
};
const response: any = await getBackendSrv().fetch<any>(requestOptions).toPromise();
return response.data;
// return getHistoryPromise
// .then(timeseries => this.applyDataProcessingFunctions(timeseries, target))
// .then(timeseries => downsampleSeries(timeseries, options));
}
getTrendValueType(target) {
@@ -453,10 +485,10 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
return this.zabbix.getItemsByIDs(itemids)
.then(items => {
return this.queryNumericDataForItems(items, target, timeRange, useTrends, options);
})
.then(result => {
return result.map(s => responseHandler.seriesToDataFrame(s, target));
});
// .then(result => {
// return (result as any).map(s => responseHandler.seriesToDataFrame(s, target));
// });
}
/**

View File

@@ -2,7 +2,19 @@ import _ from 'lodash';
import TableModel from 'grafana/app/core/table_model';
import * as c from './constants';
import * as utils from './utils';
import { ArrayVector, DataFrame, Field, FieldType, MutableDataFrame, MutableField, TIME_SERIES_TIME_FIELD_NAME, TIME_SERIES_VALUE_FIELD_NAME } from '@grafana/data';
import {
ArrayVector,
DataFrame,
dataFrameFromJSON,
DataFrameJSON,
Field,
FieldType,
getTimeField,
MutableDataFrame,
MutableField,
TIME_SERIES_TIME_FIELD_NAME,
TIME_SERIES_VALUE_FIELD_NAME,
} from '@grafana/data';
import { ZabbixMetricsQuery } from './types';
/**
@@ -141,6 +153,51 @@ export function seriesToDataFrame(timeseries, target: ZabbixMetricsQuery, valueM
return mutableFrame;
}
export function dataResponseToTimeSeries(response: DataFrameJSON[], items) {
const series = [];
if (response.length === 0) {
return [];
}
for (const frameJSON of response) {
const frame = dataFrameFromJSON(frameJSON);
const { timeField, timeIndex } = getTimeField(frame);
for (let i = 0; i < frame.fields.length; i++) {
const field = frame.fields[i];
if (i === timeIndex || !field.values || !field.values.length) {
continue;
}
const s = [];
for (let j = 0; j < field.values.length; j++) {
const v = field.values.get(j);
if (v !== null) {
s.push({ time: timeField.values.get(j) / 1000, value: v });
}
}
const itemid = field.name;
const item = _.find(items, {'itemid': itemid});
let interval = utils.parseItemInterval(item.delay);
if (interval === 0) {
interval = null;
}
const timeSeriesData = {
ts: s,
meta: {
name: item.name,
item,
interval,
}
};
series.push(timeSeriesData);
}
}
return series;
}
export function isConvertibleToWide(data: DataFrame[]): boolean {
if (!data || data.length < 2) {
return false;
@@ -441,6 +498,7 @@ export default {
handleTriggersResponse,
sortTimeseries,
seriesToDataFrame,
dataResponseToTimeSeries,
isConvertibleToWide,
convertToWide,
alignFrames,

View File

@@ -1,5 +1,6 @@
import _ from 'lodash';
import { getDataSourceSrv } from '@grafana/runtime';
import responseHandler from "../../responseHandler";
export const DEFAULT_QUERY_LIMIT = 10000;
export const HISTORY_TO_TABLE_MAP = {
@@ -92,10 +93,6 @@ export class DBConnector {
getTrends() {
throw new ZabbixNotImplemented('getTrends()');
}
handleGrafanaTSResponse(history, items, addHostName = true) {
return convertGrafanaTSResponse(history, items, addHostName);
}
}
// Define Zabbix DB Connector exception type for non-implemented methods
@@ -111,6 +108,12 @@ export class ZabbixNotImplemented {
}
}
export function handleDBDataSourceResponse(response, items) {
const series = responseHandler.dataResponseToTimeSeries(response, items);
// return convertGrafanaTSResponse(series, items, addHostName);
return series;
}
/**
* Converts time series returned by the data source into format that Grafana expects
* time_series is Array of series:
@@ -121,7 +124,11 @@ export class ZabbixNotImplemented {
* }]
* ```
*/
function convertGrafanaTSResponse(time_series, items, addHostName) {
export function convertGrafanaTSResponse(time_series, items, addHostName) {
if (time_series.length === 0) {
return [];
}
//uniqBy is needed to deduplicate
const hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid');
let grafanaSeries = _.map(_.compact(time_series), series => {

View File

@@ -109,7 +109,7 @@ export class SQLConnector extends DBConnector {
};
return getBackendSrv().datasourceRequest({
url: '/api/tsdb/query',
url: '/api/ds/query',
method: 'POST',
data: {
queries: [queryDef],
@@ -118,7 +118,7 @@ export class SQLConnector extends DBConnector {
.then(response => {
let results = response.data.results;
if (results['A']) {
return results['A'].series;
return results['A'].frames;
} else {
return null;
}

View File

@@ -5,7 +5,7 @@ import * as utils from '../utils';
import responseHandler from '../responseHandler';
import { CachingProxy } from './proxy/cachingProxy';
// import { ZabbixNotImplemented } from './connectors/dbConnector';
import { DBConnector } from './connectors/dbConnector';
import { DBConnector, handleDBDataSourceResponse } from './connectors/dbConnector';
import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector';
import { SQLConnector } from './connectors/sql/sqlConnector';
import { InfluxDBConnector } from './connectors/influxdb/influxdbConnector';
@@ -432,7 +432,7 @@ export class Zabbix implements ZabbixConnector {
const [timeFrom, timeTo] = timeRange;
if (this.enableDirectDBConnection) {
return this.getHistoryDB(items, timeFrom, timeTo, options)
.then(history => this.dbConnector.handleGrafanaTSResponse(history, items));
.then(history => handleDBDataSourceResponse(history, items));
} else {
return this.zabbixAPI.getHistory(items, timeFrom, timeTo)
.then(history => responseHandler.handleHistory(history, items));
@@ -443,7 +443,7 @@ export class Zabbix implements ZabbixConnector {
const [timeFrom, timeTo] = timeRange;
if (this.enableDirectDBConnection) {
return this.getTrendsDB(items, timeFrom, timeTo, options)
.then(history => this.dbConnector.handleGrafanaTSResponse(history, items));
.then(history => handleDBDataSourceResponse(history, items));
} else {
const valueType = options.consolidateBy || options.valueType;
return this.zabbixAPI.getTrend(items, timeFrom, timeTo)