diff --git a/src/datasource-zabbix/config.controller.js b/src/datasource-zabbix/config.controller.js index 02de9d2..fcd77e9 100644 --- a/src/datasource-zabbix/config.controller.js +++ b/src/datasource-zabbix/config.controller.js @@ -1,7 +1,7 @@ import _ from 'lodash'; import { migrateDSConfig } from './migrations'; -const SUPPORTED_SQL_DS = ['mysql', 'postgres']; +const SUPPORTED_SQL_DS = ['mysql', 'postgres', 'influxdb']; const zabbixVersions = [ { name: '2.x', value: 2 }, { name: '3.x', value: 3 }, diff --git a/src/datasource-zabbix/zabbix/connectors/influxdb/influxdbConnector.js b/src/datasource-zabbix/zabbix/connectors/influxdb/influxdbConnector.js new file mode 100644 index 0000000..9ae38a5 --- /dev/null +++ b/src/datasource-zabbix/zabbix/connectors/influxdb/influxdbConnector.js @@ -0,0 +1,152 @@ +import _ from 'lodash'; +import DBConnector from '../dbConnector'; + +const DEFAULT_QUERY_LIMIT = 10000; +const HISTORY_TO_TABLE_MAP = { + '0': 'history', + '1': 'history_str', + '2': 'history_log', + '3': 'history_uint', + '4': 'history_text' +}; + +const TREND_TO_TABLE_MAP = { + '0': 'trends', + '3': 'trends_uint' +}; + +const consolidateByFunc = { + 'avg': 'AVG', + 'min': 'MIN', + 'max': 'MAX', + 'sum': 'SUM', + 'count': 'COUNT' +}; + +const consolidateByTrendColumns = { + 'avg': 'value_avg', + 'min': 'value_min', + 'max': 'value_max', + 'sum': 'num*value_avg' // sum of sums inside the one-hour trend period +}; + +export class InfluxDBConnector extends DBConnector { + constructor(options, backendSrv, datasourceSrv) { + super(options, backendSrv, datasourceSrv); + this.limit = options.limit || DEFAULT_QUERY_LIMIT; + super.loadDBDataSource().then(ds => { + console.log(ds); + this.ds = ds; + return ds; + }); + } + + /** + * Try to invoke test query for one of Zabbix database tables. + */ + testDataSource() { + return this.ds.testDatasource(); + } + + getHistory(items, timeFrom, timeTill, options) { + let {intervalMs, consolidateBy} = options; + const intervalSec = Math.ceil(intervalMs / 1000); + + consolidateBy = consolidateBy || 'avg'; + const aggFunction = consolidateByFunc[consolidateBy]; + + // Group items by value type and perform request for each value type + const grouped_items = _.groupBy(items, 'value_type'); + const promises = _.map(grouped_items, (items, value_type) => { + const itemids = _.map(items, 'itemid'); + const table = HISTORY_TO_TABLE_MAP[value_type]; + const query = this.buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction); + console.log(query); + return this.invokeInfluxDBQuery(query); + }); + + return Promise.all(promises).then(results => { + return _.flatten(results); + }); + } + + getTrends(items, timeFrom, timeTill, options) { + let { intervalMs, consolidateBy } = options; + const intervalSec = Math.ceil(intervalMs / 1000); + + consolidateBy = consolidateBy || 'avg'; + const aggFunction = consolidateByFunc[consolidateBy]; + + // Group items by value type and perform request for each value type + const grouped_items = _.groupBy(items, 'value_type'); + const promises = _.map(grouped_items, (items, value_type) => { + const itemids = _.map(items, 'itemid'); + const table = TREND_TO_TABLE_MAP[value_type]; + let valueColumn = _.includes(['avg', 'min', 'max', 'sum'], consolidateBy) ? consolidateBy : 'avg'; + valueColumn = consolidateByTrendColumns[valueColumn]; + const query = this.buildTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn); + console.log(query); + return this.invokeInfluxDBQuery(query); + }); + + return Promise.all(promises).then(results => { + return _.flatten(results); + }); + } + + buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) { + const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction; + const where_clause = itemids.map(itemid => `"itemid" = ${itemid}`).join(' AND '); + const query = `SELECT "itemid", "time", ${AGG}("value") FROM "${table}" + WHERE ${where_clause} AND "time" >= ${timeFrom} AND "time" <= ${timeTill} + GROUP BY time(${intervalSec}s)`; + return compactQuery(query); + } + + buildTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) { + const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction; + const where_clause = itemids.map(itemid => `"itemid" = ${itemid}`).join(' AND '); + const query = `SELECT "itemid", "time", ${AGG}("${valueColumn}") FROM "${table}" + WHERE ${where_clause} AND "time" >= ${timeFrom} AND "time" <= ${timeTill} + GROUP BY time(${intervalSec}s)`; + return compactQuery(query); + } + + handleGrafanaTSResponse(history, items, addHostName = true) { + return convertGrafanaTSResponse(history, items, addHostName); + } + + invokeInfluxDBQuery(query) { + return this.ds._seriesQuery(query); + } +} + +/////////////////////////////////////////////////////////////////////////////// + +function convertGrafanaTSResponse(time_series, items, addHostName) { + //uniqBy is needed to deduplicate + var hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid'); + let grafanaSeries = _.map(_.compact(time_series), series => { + let itemid = series.name; + var item = _.find(items, {'itemid': itemid}); + var alias = item.name; + //only when actual multi hosts selected + if (_.keys(hosts).length > 1 && addHostName) { + var host = _.find(hosts, {'hostid': item.hostid}); + alias = host.name + ": " + alias; + } + // CachingProxy deduplicates requests and returns one time series for equal queries. + // Clone is needed to prevent changing of series object shared between all targets. + let datapoints = _.cloneDeep(series.points); + return { + target: alias, + datapoints: datapoints + }; + }); + + return _.sortBy(grafanaSeries, 'target'); +} + +function compactQuery(query) { + return query.replace(/\s+/g, ' '); +} diff --git a/src/datasource-zabbix/zabbix/zabbix.js b/src/datasource-zabbix/zabbix/zabbix.js index b6cb66b..e720580 100644 --- a/src/datasource-zabbix/zabbix/zabbix.js +++ b/src/datasource-zabbix/zabbix/zabbix.js @@ -1,8 +1,10 @@ import _ from 'lodash'; import * as utils from '../utils'; import responseHandler from '../responseHandler'; +import DBConnector from './connectors/dbConnector'; import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector'; import { SQLConnector } from './connectors/sql/sqlConnector'; +import { InfluxDBConnector } from './connectors/influxdb/influxdbConnector'; import { CachingProxy } from './proxy/cachingProxy'; import { ZabbixNotImplemented } from './connectors/dbConnector'; @@ -48,19 +50,27 @@ export class Zabbix { this.zabbixAPI = new ZabbixAPIConnector(url, username, password, zabbixVersion, basicAuth, withCredentials, backendSrv); + this.proxyfyRequests(); + this.cacheRequests(); + this.bindRequests(); + if (enableDirectDBConnection) { let dbConnectorOptions = { datasourceId: dbConnectionDatasourceId, datasourceName: dbConnectionDatasourceName }; - this.dbConnector = new SQLConnector(dbConnectorOptions, backendSrv, datasourceSrv); - this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector); - this.getTrendsDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getTrends, 'getTrends', this.dbConnector); + this.dbConnector = new DBConnector(dbConnectorOptions, backendSrv, datasourceSrv); + this.dbConnector.loadDBDataSource().then(ds => { + if (ds.type === 'influxdb') { + this.dbConnector = new InfluxDBConnector(dbConnectorOptions, backendSrv, datasourceSrv); + } else { + this.dbConnector = new SQLConnector(dbConnectorOptions, backendSrv, datasourceSrv); + } + }).then(() => { + this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector); + this.getTrendsDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getTrends, 'getTrends', this.dbConnector); + }); } - - this.proxyfyRequests(); - this.cacheRequests(); - this.bindRequests(); } proxyfyRequests() {