Refactor queries

This commit is contained in:
Alexander Zobnin
2021-08-05 14:21:38 +03:00
parent 3831c6e28e
commit 6c1722d2ef
3 changed files with 170 additions and 182 deletions

View File

@@ -4,7 +4,7 @@ import _ from 'lodash';
import * as utils from './utils';
import ts, { groupBy_perf as groupBy } from './timeseries';
import { getTemplateSrv } from '@grafana/runtime';
import { DataFrame, Field, FieldType, TIME_SERIES_VALUE_FIELD_NAME } from '@grafana/data';
import { DataFrame, FieldType, TIME_SERIES_VALUE_FIELD_NAME } from '@grafana/data';
const SUM = ts.SUM;
const COUNT = ts.COUNT;

View File

@@ -1,6 +1,5 @@
import _ from 'lodash';
import { Observable, of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
import { Observable } from 'rxjs';
import config from 'grafana/app/core/config';
import { contextSrv } from 'grafana/app/core/core';
import * as dateMath from 'grafana/app/core/utils/datemath';
@@ -8,14 +7,13 @@ import * as utils from './utils';
import * as migrations from './migrations';
import * as metricFunctions from './metricFunctions';
import * as c from './constants';
import { align, fillTrendsWithNulls } from './timeseries';
import dataProcessor from './dataProcessor';
import responseHandler from './responseHandler';
import problemsHandler from './problemsHandler';
import { Zabbix } from './zabbix/zabbix';
import { ZabbixAPIError } from './zabbix/connectors/zabbix_api/zabbixAPIConnector';
import { ZabbixMetricsQuery, ZabbixDSOptions, VariableQueryTypes, ShowProblemTypes, ProblemDTO } from './types';
import {BackendSrvRequest, getBackendSrv, getTemplateSrv, toDataQueryError, toDataQueryResponse} from '@grafana/runtime';
import { ProblemDTO, ShowProblemTypes, VariableQueryTypes, ZabbixDSOptions, ZabbixMetricsQuery } from './types';
import { BackendSrvRequest, getBackendSrv, getTemplateSrv, toDataQueryResponse } from '@grafana/runtime';
import {
DataFrame,
dataFrameFromJSON,
@@ -60,17 +58,17 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
this.replaceTemplateVars = _.partial(replaceTemplateVars, this.templateSrv);
// General data source settings
this.datasourceId = instanceSettings.id;
this.name = instanceSettings.name;
this.basicAuth = instanceSettings.basicAuth;
this.withCredentials = instanceSettings.withCredentials;
this.datasourceId = instanceSettings.id;
this.name = instanceSettings.name;
this.basicAuth = instanceSettings.basicAuth;
this.withCredentials = instanceSettings.withCredentials;
const jsonData = migrations.migrateDSConfig(instanceSettings.jsonData);
// Use trends instead history since specified time
this.trends = jsonData.trends;
this.trendsFrom = jsonData.trendsFrom || '7d';
this.trendsRange = jsonData.trendsRange || '4d';
this.trends = jsonData.trends;
this.trendsFrom = jsonData.trendsFrom || '7d';
this.trendsRange = jsonData.trendsRange || '4d';
// Set cache update interval
const ttl = jsonData.cacheTTL || '1h';
@@ -117,97 +115,27 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
return migrations.migrate(target);
});
const backendResponsePromise = this.backendQuery({...request, targets: requestTargets});
const backendResponsePromise = this.backendQuery({ ...request, targets: requestTargets });
const dbConnectionResponsePromise = this.dbConnectionQuery({ ...request, targets: requestTargets });
const frontendResponsePromise = this.frontendQuery({ ...request, targets: requestTargets });
// Create request for each target
const frontendTargets = requestTargets.filter(t => !this.isBackendTarget(t));
const promises = _.map(frontendTargets, target => {
// Don't request for hidden targets
if (target.hide) {
return [];
return Promise.all([backendResponsePromise, dbConnectionResponsePromise, frontendResponsePromise])
.then(rsp => {
// Merge backend and frontend queries results
const [backendRes, dbConnectionRes, frontendRes] = rsp;
if (dbConnectionRes.data) {
backendRes.data = backendRes.data.concat(dbConnectionRes.data);
}
if (frontendRes.data) {
backendRes.data = backendRes.data.concat(frontendRes.data);
}
let timeFrom = Math.ceil(dateMath.parse(request.range.from) / 1000);
let timeTo = Math.ceil(dateMath.parse(request.range.to) / 1000);
// Add range variables
request.scopedVars = Object.assign({}, request.scopedVars, utils.getRangeScopedVars(request.range));
this.replaceTargetVariables(target, request);
// Apply Time-related functions (timeShift(), etc)
const timeFunctions = bindFunctionDefs(target.functions, 'Time');
if (timeFunctions.length) {
const [time_from, time_to] = utils.sequence(timeFunctions)([timeFrom, timeTo]);
timeFrom = time_from;
timeTo = time_to;
}
const timeRange = [timeFrom, timeTo];
const useTrends = this.isUseTrends(timeRange);
// Metrics or Text query
if (!target.queryType || target.queryType === c.MODE_METRICS || target.queryType === c.MODE_TEXT) {
// Don't request undefined targets
if (!target.group || !target.host || !target.item) {
return [];
}
if (!target.queryType || target.queryType === c.MODE_METRICS) {
return this.queryNumericData(target, timeRange, useTrends, request);
} else if (target.queryType === c.MODE_TEXT) {
return this.queryTextData(target, timeRange);
} else {
return [];
}
} else if (target.queryType === c.MODE_ITEMID) {
// Item ID query
if (!target.itemids) {
return [];
}
return this.queryItemIdData(target, timeRange, useTrends, request);
} else if (target.queryType === c.MODE_ITSERVICE) {
// IT services query
return this.queryITServiceData(target, timeRange, request);
} else if (target.queryType === c.MODE_TRIGGERS) {
// Triggers query
return this.queryTriggersData(target, timeRange);
} else if (target.queryType === c.MODE_PROBLEMS) {
// Problems query
return this.queryProblems(target, timeRange, request);
} else {
return [];
}
return {
data: backendRes.data,
state: LoadingState.Done,
key: request.requestId,
};
});
// Data for panel (all targets)
const frontendResponsePromise: Promise<DataQueryResponse> = Promise.all(_.flatten(promises))
.then(_.flatten)
.then(data => {
if (data && data.length > 0 && isDataFrame(data[0]) && !utils.isProblemsDataFrame(data[0])) {
data = responseHandler.alignFrames(data);
if (responseHandler.isConvertibleToWide(data)) {
console.log('Converting response to the wide format');
data = responseHandler.convertToWide(data);
}
}
return { data };
});
return Promise.all([backendResponsePromise, frontendResponsePromise])
.then(rsp => {
// Merge backend and frontend queries results
const [backendRes, frontendRes] = rsp;
if (frontendRes.data) {
backendRes.data = backendRes.data.concat(frontendRes.data);
}
return {
data: backendRes.data,
state: LoadingState.Done,
key: request.requestId,
};
});
}
async backendQuery(request: DataQueryRequest<any>): Promise<DataQueryResponse> {
@@ -218,23 +146,16 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
request.scopedVars = Object.assign({}, request.scopedVars, utils.getRangeScopedVars(request.range));
const queries = _.compact(targets.map((query) => {
const datasourceId = this.id;
// Don't request for hidden targets
if (query.hide) {
return null;
}
// Prevent changes of original object
let target = _.cloneDeep(query);
// Migrate old targets
target = migrations.migrate(target);
this.replaceTargetVariables(target, request);
this.replaceTargetVariables(query, request);
return {
...target,
datasourceId,
...query,
datasourceId: this.datasourceId,
intervalMs,
maxDataPoints,
};
@@ -276,10 +197,108 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
return resp;
}
async frontendQuery(request: DataQueryRequest<any>): Promise<DataQueryResponse> {
const frontendTargets = request.targets.filter(t => !(this.isBackendTarget(t) || this.isDBConnectionTarget(t)));
const promises = _.map(frontendTargets, target => {
// Don't request for hidden targets
if (target.hide) {
return [];
}
// Add range variables
request.scopedVars = Object.assign({}, request.scopedVars, utils.getRangeScopedVars(request.range));
this.replaceTargetVariables(target, request);
const timeRange = this.buildTimeRange(request, target);
if (target.queryType === c.MODE_TEXT) {
// Text query
// Don't request undefined targets
if (!target.group || !target.host || !target.item) {
return [];
}
return this.queryTextData(target, timeRange);
} else if (target.queryType === c.MODE_ITSERVICE) {
// IT services query
return this.queryITServiceData(target, timeRange, request);
} else if (target.queryType === c.MODE_TRIGGERS) {
// Triggers query
return this.queryTriggersData(target, timeRange);
} else if (target.queryType === c.MODE_PROBLEMS) {
// Problems query
return this.queryProblems(target, timeRange, request);
} else {
return [];
}
});
// Data for panel (all targets)
return Promise.all(_.flatten(promises))
.then(_.flatten)
.then(data => {
if (data && data.length > 0 && isDataFrame(data[0]) && !utils.isProblemsDataFrame(data[0])) {
data = responseHandler.alignFrames(data);
if (responseHandler.isConvertibleToWide(data)) {
console.log('Converting response to the wide format');
data = responseHandler.convertToWide(data);
}
}
return { data };
});
}
async dbConnectionQuery(request: DataQueryRequest<any>): Promise<DataQueryResponse> {
const targets = request.targets.filter(this.isDBConnectionTarget);
const queries = _.compact(targets.map((target) => {
// Don't request for hidden targets
if (target.hide) {
return [];
}
// Add range variables
request.scopedVars = Object.assign({}, request.scopedVars, utils.getRangeScopedVars(request.range));
this.replaceTargetVariables(target, request);
const timeRange = this.buildTimeRange(request, target);
const useTrends = this.isUseTrends(timeRange);
if (!target.queryType || target.queryType === c.MODE_METRICS) {
return this.queryNumericData(target, timeRange, useTrends, request);
} else if (target.queryType === c.MODE_ITEMID) {
// Item ID query
if (!target.itemids) {
return [];
}
return this.queryItemIdData(target, timeRange, useTrends, request);
} else {
return [];
}
}));
const promises: Promise<DataQueryResponse> = Promise.all(queries)
.then(_.flatten)
.then(data => ({ data }));
return promises;
}
buildTimeRange(request, target) {
let timeFrom = Math.ceil(dateMath.parse(request.range.from) / 1000);
let timeTo = Math.ceil(dateMath.parse(request.range.to) / 1000);
// Apply Time-related functions (timeShift(), etc)
const timeFunctions = bindFunctionDefs(target.functions, 'Time');
if (timeFunctions.length) {
const [time_from, time_to] = utils.sequence(timeFunctions)([timeFrom, timeTo]);
timeFrom = time_from;
timeTo = time_to;
}
return [timeFrom, timeTo];
}
/**
* Query target data for Metrics
*/
async queryNumericData(target, timeRange, useTrends, options): Promise<DataFrame[]> {
async queryNumericData(target, timeRange, useTrends, request): Promise<any> {
const getItemOptions = {
itemtype: 'num'
};
@@ -287,7 +306,7 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
const items = await this.zabbix.getItemsFromTarget(target, getItemOptions);
const queryStart = new Date().getTime();
const result = await this.queryNumericDataForItems(items, target, timeRange, useTrends, options);
const result = await this.queryNumericDataForItems(items, target, timeRange, useTrends, request);
const queryEnd = new Date().getTime();
if (this.enableDebugLog) {
@@ -297,13 +316,19 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
const frames = [];
for (const frameJSON of result) {
const frame = dataFrameFromJSON(frameJSON);
frame.refId = target.refId;
frames.push(frame);
}
return frames;
// const valueMappings = await this.zabbix.getValueMappings();
//
// const dataFrames = (result as any).map(s => responseHandler.seriesToDataFrame(s, target, valueMappings));
// return dataFrames;
const resp = { data: frames };
this.sortByRefId(resp);
this.applyFrontendFunctions(resp, request);
if (responseHandler.isConvertibleToWide(resp.data)) {
console.log('Converting response to the wide format');
resp.data = responseHandler.convertToWide(resp.data);
}
return resp.data;
}
/**
@@ -313,20 +338,14 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
let history;
options.valueType = this.getTrendValueType(target);
options.consolidateBy = getConsolidateBy(target) || options.valueType;
const disableDataAlignment = this.disableDataAlignment || target.options?.disableDataAlignment;
if (useTrends) {
history = await this.zabbix.getTrends(items, timeRange, options);
// .then(timeseries => {
// return !disableDataAlignment ? this.fillTrendTimeSeriesWithNulls(timeseries) : timeseries;
// });
} else {
history = await this.zabbix.getHistoryTS(items, timeRange, options);
// .then(timeseries => {
// return !disableDataAlignment ? this.alignTimeSeriesData(timeseries) : timeseries;
// });
}
// Request backend for data processing
const requestOptions: BackendSrvRequest = {
url: `/api/datasources/${this.datasourceId}/resources/db-connection-post`,
method: 'POST',
@@ -342,10 +361,6 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
const response: any = await getBackendSrv().fetch<any>(requestOptions).toPromise();
return response.data;
// return getHistoryPromise
// .then(timeseries => this.applyDataProcessingFunctions(timeseries, target))
// .then(timeseries => downsampleSeries(timeseries, options));
}
getTrendValueType(target) {
@@ -357,21 +372,6 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
return trendValueFunc ? trendValueFunc.params[0] : "avg";
}
alignTimeSeriesData(timeseries: any[]) {
for (const ts of timeseries) {
const interval = utils.parseItemInterval(ts.scopedVars['__zbx_item_interval']?.value);
ts.datapoints = align(ts.datapoints, interval);
}
return timeseries;
}
fillTrendTimeSeriesWithNulls(timeseries: any[]) {
for (const ts of timeseries) {
ts.datapoints = fillTrendsWithNulls(ts.datapoints);
}
return timeseries;
}
sortByRefId(response: DataQueryResponse) {
response.data.sort((a, b) => {
if (a.refId < b.refId) {
@@ -396,10 +396,10 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
}
applyDataProcessingFunctions(timeseries_data, target) {
const transformFunctions = bindFunctionDefs(target.functions, 'Transform');
const transformFunctions = bindFunctionDefs(target.functions, 'Transform');
const aggregationFunctions = bindFunctionDefs(target.functions, 'Aggregate');
const filterFunctions = bindFunctionDefs(target.functions, 'Filter');
const aliasFunctions = bindFunctionDefs(target.functions, 'Alias');
const filterFunctions = bindFunctionDefs(target.functions, 'Filter');
const aliasFunctions = bindFunctionDefs(target.functions, 'Alias');
// Apply transformation functions
timeseries_data = _.cloneDeep(_.map(timeseries_data, timeseries => {
@@ -486,9 +486,6 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
.then(items => {
return this.queryNumericDataForItems(items, target, timeRange, useTrends, options);
});
// .then(result => {
// return (result as any).map(s => responseHandler.seriesToDataFrame(s, target));
// });
}
/**
@@ -515,14 +512,12 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
return this.zabbix.getITServices(itServiceFilter)
.then(itservices => {
if (options.isOldVersion) {
itservices = _.filter(itservices, {'serviceid': target.itservice?.serviceid});
itservices = _.filter(itservices, { 'serviceid': target.itservice?.serviceid });
}
return this.zabbix.getSLA(itservices, timeRange, target, options);})
return this.zabbix.getSLA(itservices, timeRange, target, options);
})
.then(itservicesdp => this.applyDataProcessingFunctions(itservicesdp, target))
.then(result => {
const dataFrames = result.map(s => responseHandler.seriesToDataFrame(s, target));
return dataFrames;
});
.then(result => result.map(s => responseHandler.seriesToDataFrame(s, target)));
}
queryTriggersData(target, timeRange) {
@@ -872,7 +867,12 @@ export class ZabbixDatasource extends DataSourceApi<ZabbixMetricsQuery, ZabbixDS
return target.queryType === c.MODE_METRICS ||
target.queryType === c.MODE_ITEMID;
}
};
isDBConnectionTarget = (target: any): boolean => {
return this.enableDirectDBConnection &&
(target.queryType === c.MODE_METRICS || target.queryType === c.MODE_ITEMID);
};
}
function bindFunctionDefs(functionDefs, category) {
@@ -898,18 +898,6 @@ function getConsolidateBy(target) {
return consolidateBy;
}
function downsampleSeries(timeseries_data, options) {
const defaultAgg = dataProcessor.aggregationFunctions['avg'];
const consolidateByFunc = dataProcessor.aggregationFunctions[options.consolidateBy] || defaultAgg;
return _.map(timeseries_data, timeseries => {
if (timeseries.datapoints.length > options.maxDataPoints) {
timeseries.datapoints = dataProcessor
.groupBy(options.interval, consolidateByFunc, timeseries.datapoints);
}
return timeseries;
});
}
function formatMetric(metricObj) {
return {
text: metricObj.name,

View File

@@ -37,12 +37,12 @@ function convertHistory(history, items, addHostName, convertPointCallback) {
* ]
*/
// Group history by itemid
// Group history by itemid
const grouped_history = _.groupBy(history, 'itemid');
const hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid'); //uniqBy is needed to deduplicate
return _.map(grouped_history, (hist, itemid) => {
const item = _.find(items, {'itemid': itemid}) as any;
const item = _.find(items, { 'itemid': itemid }) as any;
let alias = item.name;
// Add scopedVars for using in alias functions
@@ -54,7 +54,7 @@ function convertHistory(history, items, addHostName, convertPointCallback) {
};
if (_.keys(hosts).length > 0) {
const host = _.find(hosts, {'hostid': item.hostid});
const host = _.find(hosts, { 'hostid': item.hostid });
scopedVars['__zbx_host'] = { value: host.host };
scopedVars['__zbx_host_name'] = { value: host.name };
@@ -140,7 +140,7 @@ export function seriesToDataFrame(timeseries, target: ZabbixMetricsQuery, valueM
}
}
const fields: Field[] = [ timeFiled, valueFiled ];
const fields: Field[] = [timeFiled, valueFiled];
const frame: DataFrame = {
name: seriesName,
@@ -177,7 +177,7 @@ export function dataResponseToTimeSeries(response: DataFrameJSON[], items) {
}
const itemid = field.name;
const item = _.find(items, {'itemid': itemid});
const item = _.find(items, { 'itemid': itemid });
let interval = utils.parseItemInterval(item.delay);
if (interval === 0) {
interval = null;
@@ -249,7 +249,7 @@ export function alignFrames(data: MutableDataFrame[]): MutableDataFrame[] {
const missingTimestamps = [];
const missingValues = [];
const frameInterval: number = timeField.config.custom?.itemInterval;
for (let j = minTimestamp; j < firstTs; j+=frameInterval) {
for (let j = minTimestamp; j < firstTs; j += frameInterval) {
missingTimestamps.push(j);
missingValues.push(null);
}
@@ -270,7 +270,7 @@ export function convertToWide(data: MutableDataFrame[]): DataFrame[] {
return [];
}
const fields: MutableField[] = [ timeField ];
const fields: MutableField[] = [timeField];
for (let i = 0; i < data.length; i++) {
const valueField = data[i].fields.find(f => f.name === TIME_SERIES_VALUE_FIELD_NAME);
@@ -320,10 +320,10 @@ function handleText(history, items, target, addHostName = true) {
function handleHistoryAsTable(history, items, target) {
const table: any = new TableModel();
table.addColumn({text: 'Host'});
table.addColumn({text: 'Item'});
table.addColumn({text: 'Key'});
table.addColumn({text: 'Last value'});
table.addColumn({ text: 'Host' });
table.addColumn({ text: 'Item' });
table.addColumn({ text: 'Key' });
table.addColumn({ text: 'Last value' });
const grouped_history = _.groupBy(history, 'itemid');
_.each(items, (item) => {
@@ -422,9 +422,9 @@ function handleTriggersResponse(triggers, groups, timeRange) {
const stats = getTriggerStats(triggers);
const groupNames = _.map(groups, 'name');
const table: any = new TableModel();
table.addColumn({text: 'Host group'});
table.addColumn({ text: 'Host group' });
_.each(_.orderBy(c.TRIGGER_SEVERITY, ['val'], ['desc']), (severity) => {
table.addColumn({text: severity.text});
table.addColumn({ text: severity.text });
});
_.each(stats, (severity_stats, group) => {
if (_.includes(groupNames, group)) {
@@ -442,7 +442,7 @@ function getTriggerStats(triggers) {
// let severity = _.map(c.TRIGGER_SEVERITY, 'text');
const stats = {};
_.each(groups, (group) => {
stats[group] = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0}; // severity:count
stats[group] = { 0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0 }; // severity:count
});
_.each(triggers, (trigger) => {
_.each(trigger.groups, (group) => {