influxdb connector WIP
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
import { migrateDSConfig } from './migrations';
|
import { migrateDSConfig } from './migrations';
|
||||||
|
|
||||||
const SUPPORTED_SQL_DS = ['mysql', 'postgres'];
|
const SUPPORTED_SQL_DS = ['mysql', 'postgres', 'influxdb'];
|
||||||
const zabbixVersions = [
|
const zabbixVersions = [
|
||||||
{ name: '2.x', value: 2 },
|
{ name: '2.x', value: 2 },
|
||||||
{ name: '3.x', value: 3 },
|
{ name: '3.x', value: 3 },
|
||||||
|
|||||||
@@ -0,0 +1,152 @@
|
|||||||
|
import _ from 'lodash';
|
||||||
|
import DBConnector from '../dbConnector';
|
||||||
|
|
||||||
|
const DEFAULT_QUERY_LIMIT = 10000;
|
||||||
|
const HISTORY_TO_TABLE_MAP = {
|
||||||
|
'0': 'history',
|
||||||
|
'1': 'history_str',
|
||||||
|
'2': 'history_log',
|
||||||
|
'3': 'history_uint',
|
||||||
|
'4': 'history_text'
|
||||||
|
};
|
||||||
|
|
||||||
|
const TREND_TO_TABLE_MAP = {
|
||||||
|
'0': 'trends',
|
||||||
|
'3': 'trends_uint'
|
||||||
|
};
|
||||||
|
|
||||||
|
const consolidateByFunc = {
|
||||||
|
'avg': 'AVG',
|
||||||
|
'min': 'MIN',
|
||||||
|
'max': 'MAX',
|
||||||
|
'sum': 'SUM',
|
||||||
|
'count': 'COUNT'
|
||||||
|
};
|
||||||
|
|
||||||
|
const consolidateByTrendColumns = {
|
||||||
|
'avg': 'value_avg',
|
||||||
|
'min': 'value_min',
|
||||||
|
'max': 'value_max',
|
||||||
|
'sum': 'num*value_avg' // sum of sums inside the one-hour trend period
|
||||||
|
};
|
||||||
|
|
||||||
|
export class InfluxDBConnector extends DBConnector {
|
||||||
|
constructor(options, backendSrv, datasourceSrv) {
|
||||||
|
super(options, backendSrv, datasourceSrv);
|
||||||
|
this.limit = options.limit || DEFAULT_QUERY_LIMIT;
|
||||||
|
super.loadDBDataSource().then(ds => {
|
||||||
|
console.log(ds);
|
||||||
|
this.ds = ds;
|
||||||
|
return ds;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to invoke test query for one of Zabbix database tables.
|
||||||
|
*/
|
||||||
|
testDataSource() {
|
||||||
|
return this.ds.testDatasource();
|
||||||
|
}
|
||||||
|
|
||||||
|
getHistory(items, timeFrom, timeTill, options) {
|
||||||
|
let {intervalMs, consolidateBy} = options;
|
||||||
|
const intervalSec = Math.ceil(intervalMs / 1000);
|
||||||
|
|
||||||
|
consolidateBy = consolidateBy || 'avg';
|
||||||
|
const aggFunction = consolidateByFunc[consolidateBy];
|
||||||
|
|
||||||
|
// Group items by value type and perform request for each value type
|
||||||
|
const grouped_items = _.groupBy(items, 'value_type');
|
||||||
|
const promises = _.map(grouped_items, (items, value_type) => {
|
||||||
|
const itemids = _.map(items, 'itemid');
|
||||||
|
const table = HISTORY_TO_TABLE_MAP[value_type];
|
||||||
|
const query = this.buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction);
|
||||||
|
console.log(query);
|
||||||
|
return this.invokeInfluxDBQuery(query);
|
||||||
|
});
|
||||||
|
|
||||||
|
return Promise.all(promises).then(results => {
|
||||||
|
return _.flatten(results);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getTrends(items, timeFrom, timeTill, options) {
|
||||||
|
let { intervalMs, consolidateBy } = options;
|
||||||
|
const intervalSec = Math.ceil(intervalMs / 1000);
|
||||||
|
|
||||||
|
consolidateBy = consolidateBy || 'avg';
|
||||||
|
const aggFunction = consolidateByFunc[consolidateBy];
|
||||||
|
|
||||||
|
// Group items by value type and perform request for each value type
|
||||||
|
const grouped_items = _.groupBy(items, 'value_type');
|
||||||
|
const promises = _.map(grouped_items, (items, value_type) => {
|
||||||
|
const itemids = _.map(items, 'itemid');
|
||||||
|
const table = TREND_TO_TABLE_MAP[value_type];
|
||||||
|
let valueColumn = _.includes(['avg', 'min', 'max', 'sum'], consolidateBy) ? consolidateBy : 'avg';
|
||||||
|
valueColumn = consolidateByTrendColumns[valueColumn];
|
||||||
|
const query = this.buildTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn);
|
||||||
|
console.log(query);
|
||||||
|
return this.invokeInfluxDBQuery(query);
|
||||||
|
});
|
||||||
|
|
||||||
|
return Promise.all(promises).then(results => {
|
||||||
|
return _.flatten(results);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
|
||||||
|
const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction;
|
||||||
|
const where_clause = itemids.map(itemid => `"itemid" = ${itemid}`).join(' AND ');
|
||||||
|
const query = `SELECT "itemid", "time", ${AGG}("value") FROM "${table}"
|
||||||
|
WHERE ${where_clause} AND "time" >= ${timeFrom} AND "time" <= ${timeTill}
|
||||||
|
GROUP BY time(${intervalSec}s)`;
|
||||||
|
return compactQuery(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
buildTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) {
|
||||||
|
const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction;
|
||||||
|
const where_clause = itemids.map(itemid => `"itemid" = ${itemid}`).join(' AND ');
|
||||||
|
const query = `SELECT "itemid", "time", ${AGG}("${valueColumn}") FROM "${table}"
|
||||||
|
WHERE ${where_clause} AND "time" >= ${timeFrom} AND "time" <= ${timeTill}
|
||||||
|
GROUP BY time(${intervalSec}s)`;
|
||||||
|
return compactQuery(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
handleGrafanaTSResponse(history, items, addHostName = true) {
|
||||||
|
return convertGrafanaTSResponse(history, items, addHostName);
|
||||||
|
}
|
||||||
|
|
||||||
|
invokeInfluxDBQuery(query) {
|
||||||
|
return this.ds._seriesQuery(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function convertGrafanaTSResponse(time_series, items, addHostName) {
|
||||||
|
//uniqBy is needed to deduplicate
|
||||||
|
var hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid');
|
||||||
|
let grafanaSeries = _.map(_.compact(time_series), series => {
|
||||||
|
let itemid = series.name;
|
||||||
|
var item = _.find(items, {'itemid': itemid});
|
||||||
|
var alias = item.name;
|
||||||
|
//only when actual multi hosts selected
|
||||||
|
if (_.keys(hosts).length > 1 && addHostName) {
|
||||||
|
var host = _.find(hosts, {'hostid': item.hostid});
|
||||||
|
alias = host.name + ": " + alias;
|
||||||
|
}
|
||||||
|
// CachingProxy deduplicates requests and returns one time series for equal queries.
|
||||||
|
// Clone is needed to prevent changing of series object shared between all targets.
|
||||||
|
let datapoints = _.cloneDeep(series.points);
|
||||||
|
return {
|
||||||
|
target: alias,
|
||||||
|
datapoints: datapoints
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
return _.sortBy(grafanaSeries, 'target');
|
||||||
|
}
|
||||||
|
|
||||||
|
function compactQuery(query) {
|
||||||
|
return query.replace(/\s+/g, ' ');
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
import * as utils from '../utils';
|
import * as utils from '../utils';
|
||||||
import responseHandler from '../responseHandler';
|
import responseHandler from '../responseHandler';
|
||||||
|
import DBConnector from './connectors/dbConnector';
|
||||||
import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector';
|
import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector';
|
||||||
import { SQLConnector } from './connectors/sql/sqlConnector';
|
import { SQLConnector } from './connectors/sql/sqlConnector';
|
||||||
|
import { InfluxDBConnector } from './connectors/influxdb/influxdbConnector';
|
||||||
import { CachingProxy } from './proxy/cachingProxy';
|
import { CachingProxy } from './proxy/cachingProxy';
|
||||||
import { ZabbixNotImplemented } from './connectors/dbConnector';
|
import { ZabbixNotImplemented } from './connectors/dbConnector';
|
||||||
|
|
||||||
@@ -48,19 +50,27 @@ export class Zabbix {
|
|||||||
|
|
||||||
this.zabbixAPI = new ZabbixAPIConnector(url, username, password, zabbixVersion, basicAuth, withCredentials, backendSrv);
|
this.zabbixAPI = new ZabbixAPIConnector(url, username, password, zabbixVersion, basicAuth, withCredentials, backendSrv);
|
||||||
|
|
||||||
|
this.proxyfyRequests();
|
||||||
|
this.cacheRequests();
|
||||||
|
this.bindRequests();
|
||||||
|
|
||||||
if (enableDirectDBConnection) {
|
if (enableDirectDBConnection) {
|
||||||
let dbConnectorOptions = {
|
let dbConnectorOptions = {
|
||||||
datasourceId: dbConnectionDatasourceId,
|
datasourceId: dbConnectionDatasourceId,
|
||||||
datasourceName: dbConnectionDatasourceName
|
datasourceName: dbConnectionDatasourceName
|
||||||
};
|
};
|
||||||
this.dbConnector = new SQLConnector(dbConnectorOptions, backendSrv, datasourceSrv);
|
this.dbConnector = new DBConnector(dbConnectorOptions, backendSrv, datasourceSrv);
|
||||||
this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector);
|
this.dbConnector.loadDBDataSource().then(ds => {
|
||||||
this.getTrendsDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getTrends, 'getTrends', this.dbConnector);
|
if (ds.type === 'influxdb') {
|
||||||
|
this.dbConnector = new InfluxDBConnector(dbConnectorOptions, backendSrv, datasourceSrv);
|
||||||
|
} else {
|
||||||
|
this.dbConnector = new SQLConnector(dbConnectorOptions, backendSrv, datasourceSrv);
|
||||||
|
}
|
||||||
|
}).then(() => {
|
||||||
|
this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector);
|
||||||
|
this.getTrendsDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getTrends, 'getTrends', this.dbConnector);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
this.proxyfyRequests();
|
|
||||||
this.cacheRequests();
|
|
||||||
this.bindRequests();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyfyRequests() {
|
proxyfyRequests() {
|
||||||
|
|||||||
Reference in New Issue
Block a user