refactor: db connectors

This commit is contained in:
Alexander Zobnin
2018-06-08 19:32:26 +03:00
parent 95bbede3da
commit 16861ad8c3
27 changed files with 1699 additions and 467 deletions

View File

@@ -0,0 +1,38 @@
import _ from 'lodash';
const NOT_IMPLEMENTED = 'Method should be implemented in subclass of DBConnector';
export default class DBConnector {
constructor(options, backendSrv, datasourceSrv) {
this.backendSrv = backendSrv;
this.datasourceSrv = datasourceSrv;
this.datasourceId = options.datasourceId;
this.datasourceName = options.datasourceName;
this.datasourceType = null;
}
loadDBDataSource() {
let ds = _.find(this.datasourceSrv.getAll(), {'id': this.datasourceId});
if (ds) {
return this.datasourceSrv.loadDatasource(ds.name)
.then(ds => {
this.datasourceType = ds.meta.id;
return ds;
});
} else {
return Promise.reject(`SQL Data Source with ID ${this.datasourceId} not found`);
}
}
testDataSource() {
throw NOT_IMPLEMENTED;
}
getHistory() {
throw NOT_IMPLEMENTED;
}
getTrends() {
throw NOT_IMPLEMENTED;
}
}

View File

@@ -0,0 +1,43 @@
/**
* MySQL queries
*/
function historyQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
let time_expression = `clock DIV ${intervalSec} * ${intervalSec}`;
let query = `
SELECT CAST(itemid AS CHAR) AS metric, ${time_expression} AS time_sec, ${aggFunction}(value) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY ${time_expression}, metric
ORDER BY time_sec ASC
`;
return query;
}
function trendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) {
let time_expression = `clock DIV ${intervalSec} * ${intervalSec}`;
let query = `
SELECT CAST(itemid AS CHAR) AS metric, ${time_expression} AS time_sec, ${aggFunction}(${valueColumn}) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY ${time_expression}, metric
ORDER BY time_sec ASC
`;
return query;
}
const TEST_QUERY = `SELECT CAST(itemid AS CHAR) AS metric, clock AS time_sec, value_avg AS value FROM trends_uint LIMIT 1`;
function testQuery() {
return TEST_QUERY;
}
const mysql = {
historyQuery,
trendsQuery,
testQuery
};
export default mysql;

View File

@@ -0,0 +1,48 @@
/**
* Postgres queries
*/
const ITEMID_FORMAT = 'FM99999999999999999999';
function historyQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
let time_expression = `clock / ${intervalSec} * ${intervalSec}`;
let query = `
SELECT to_char(itemid, '${ITEMID_FORMAT}') AS metric, ${time_expression} AS time, ${aggFunction}(value) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY 1, 2
ORDER BY time ASC
`;
return query;
}
function trendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) {
let time_expression = `clock / ${intervalSec} * ${intervalSec}`;
let query = `
SELECT to_char(itemid, '${ITEMID_FORMAT}') AS metric, ${time_expression} AS time, ${aggFunction}(${valueColumn}) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY 1, 2
ORDER BY time ASC
`;
return query;
}
const TEST_QUERY = `
SELECT to_char(itemid, '${ITEMID_FORMAT}') AS metric, clock AS time, value_avg AS value
FROM trends_uint LIMIT 1
`;
function testQuery() {
return TEST_QUERY;
}
const postgres = {
historyQuery,
trendsQuery,
testQuery
};
export default postgres;

View File

@@ -0,0 +1,173 @@
import _ from 'lodash';
import mysql from './mysql';
import postgres from './postgres';
import DBConnector from '../dbConnector';
const supportedDatabases = {
mysql: 'mysql',
postgres: 'postgres'
};
const DEFAULT_QUERY_LIMIT = 10000;
const HISTORY_TO_TABLE_MAP = {
'0': 'history',
'1': 'history_str',
'2': 'history_log',
'3': 'history_uint',
'4': 'history_text'
};
const TREND_TO_TABLE_MAP = {
'0': 'trends',
'3': 'trends_uint'
};
const consolidateByFunc = {
'avg': 'AVG',
'min': 'MIN',
'max': 'MAX',
'sum': 'SUM',
'count': 'COUNT'
};
const consolidateByTrendColumns = {
'avg': 'value_avg',
'min': 'value_min',
'max': 'value_max'
};
export class SQLConnector extends DBConnector {
constructor(options, backendSrv, datasourceSrv) {
super(options, backendSrv, datasourceSrv);
this.limit = options.limit || DEFAULT_QUERY_LIMIT;
this.sqlDialect = null;
super.loadDBDataSource()
.then(() => this.loadSQLDialect());
}
loadSQLDialect() {
if (this.datasourceType === supportedDatabases.postgres) {
this.sqlDialect = postgres;
} else {
this.sqlDialect = mysql;
}
}
/**
* Try to invoke test query for one of Zabbix database tables.
*/
testDataSource() {
let testQuery = this.sqlDialect.testQuery();
return this.invokeSQLQuery(testQuery);
}
getHistory(items, timeFrom, timeTill, options) {
let {intervalMs, consolidateBy} = options;
let intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
let aggFunction = consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
let grouped_items = _.groupBy(items, 'value_type');
let promises = _.map(grouped_items, (items, value_type) => {
let itemids = _.map(items, 'itemid').join(', ');
let table = HISTORY_TO_TABLE_MAP[value_type];
let query = this.sqlDialect.historyQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction);
query = compactSQLQuery(query);
return this.invokeSQLQuery(query);
});
return Promise.all(promises).then(results => {
return _.flatten(results);
});
}
getTrends(items, timeFrom, timeTill, options) {
let { intervalMs, consolidateBy } = options;
let intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
let aggFunction = consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
let grouped_items = _.groupBy(items, 'value_type');
let promises = _.map(grouped_items, (items, value_type) => {
let itemids = _.map(items, 'itemid').join(', ');
let table = TREND_TO_TABLE_MAP[value_type];
let valueColumn = _.includes(['avg', 'min', 'max'], consolidateBy) ? consolidateBy : 'avg';
valueColumn = consolidateByTrendColumns[valueColumn];
let query = this.sqlDialect.trendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn);
query = compactSQLQuery(query);
return this.invokeSQLQuery(query);
});
return Promise.all(promises).then(results => {
return _.flatten(results);
});
}
handleGrafanaTSResponse(history, items, addHostName = true) {
return convertGrafanaTSResponse(history, items, addHostName);
}
invokeSQLQuery(query) {
let queryDef = {
refId: 'A',
format: 'time_series',
datasourceId: this.datasourceId,
rawSql: query,
maxDataPoints: this.limit
};
return this.backendSrv.datasourceRequest({
url: '/api/tsdb/query',
method: 'POST',
data: {
queries: [queryDef],
}
})
.then(response => {
let results = response.data.results;
if (results['A']) {
return results['A'].series;
} else {
return null;
}
});
}
}
///////////////////////////////////////////////////////////////////////////////
function convertGrafanaTSResponse(time_series, items, addHostName) {
//uniqBy is needed to deduplicate
var hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid');
let grafanaSeries = _.map(time_series, series => {
let itemid = series.name;
var item = _.find(items, {'itemid': itemid});
var alias = item.name;
//only when actual multi hosts selected
if (_.keys(hosts).length > 1 && addHostName) {
var host = _.find(hosts, {'hostid': item.hostid});
alias = host.name + ": " + alias;
}
// CachingProxy deduplicates requests and returns one time series for equal queries.
// Clone is needed to prevent changing of series object shared between all targets.
let datapoints = _.cloneDeep(series.points);
return {
target: alias,
datapoints: datapoints
};
});
return _.sortBy(grafanaSeries, 'target');
}
function compactSQLQuery(query) {
return query.replace(/\s+/g, ' ');
}

View File

@@ -1,266 +0,0 @@
import _ from 'lodash';
const DEFAULT_QUERY_LIMIT = 10000;
const HISTORY_TO_TABLE_MAP = {
'0': 'history',
'1': 'history_str',
'2': 'history_log',
'3': 'history_uint',
'4': 'history_text'
};
const TREND_TO_TABLE_MAP = {
'0': 'trends',
'3': 'trends_uint'
};
const consolidateByFunc = {
'avg': 'AVG',
'min': 'MIN',
'max': 'MAX',
'sum': 'SUM',
'count': 'COUNT'
};
const consolidateByTrendColumns = {
'avg': 'value_avg',
'min': 'value_min',
'max': 'value_max'
};
export class ZabbixDBConnector {
constructor(sqlDataSourceId, options, backendSrv, datasourceSrv) {
this.backendSrv = backendSrv;
this.datasourceSrv = datasourceSrv;
let limit = options.limit;
this.sqlDataSourceId = sqlDataSourceId;
this.limit = limit || DEFAULT_QUERY_LIMIT;
this.loadSQLDataSource(sqlDataSourceId);
}
/**
* Try to load DS with given id to check it's exist.
* @param {*} datasourceId ID of SQL data source
*/
loadSQLDataSource(datasourceId) {
let ds = _.find(this.datasourceSrv.getAll(), {'id': datasourceId});
if (ds) {
return this.datasourceSrv.loadDatasource(ds.name)
.then(ds => {
this.sqlDataSourceType = ds.meta.id;
return ds;
});
} else {
return Promise.reject(`SQL Data Source with ID ${datasourceId} not found`);
}
}
/**
* Try to invoke test query for one of Zabbix database tables.
*/
testSQLDataSource() {
let testQuery = TEST_MYSQL_QUERY;
if (this.sqlDataSourceType === 'postgres') {
testQuery = TEST_POSTGRES_QUERY;
}
return this.invokeSQLQuery(testQuery);
}
getHistory(items, timeFrom, timeTill, options) {
let {intervalMs, consolidateBy} = options;
let intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
let aggFunction = consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
let grouped_items = _.groupBy(items, 'value_type');
let promises = _.map(grouped_items, (items, value_type) => {
let itemids = _.map(items, 'itemid').join(', ');
let table = HISTORY_TO_TABLE_MAP[value_type];
let dialect = this.sqlDataSourceType;
let query = buildSQLHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, dialect);
query = compactSQLQuery(query);
return this.invokeSQLQuery(query);
});
return Promise.all(promises).then(results => {
return _.flatten(results);
});
}
getTrends(items, timeFrom, timeTill, options) {
let { intervalMs, consolidateBy } = options;
let intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
let aggFunction = consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
let grouped_items = _.groupBy(items, 'value_type');
let promises = _.map(grouped_items, (items, value_type) => {
let itemids = _.map(items, 'itemid').join(', ');
let table = TREND_TO_TABLE_MAP[value_type];
let valueColumn = _.includes(['avg', 'min', 'max'], consolidateBy) ? consolidateBy : 'avg';
valueColumn = consolidateByTrendColumns[valueColumn];
let dialect = this.sqlDataSourceType;
let query = buildSQLTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn, dialect);
query = compactSQLQuery(query);
return this.invokeSQLQuery(query);
});
return Promise.all(promises).then(results => {
return _.flatten(results);
});
}
handleGrafanaTSResponse(history, items, addHostName = true) {
return convertGrafanaTSResponse(history, items, addHostName);
}
invokeSQLQuery(query) {
let queryDef = {
refId: 'A',
format: 'time_series',
datasourceId: this.sqlDataSourceId,
rawSql: query,
maxDataPoints: this.limit
};
return this.backendSrv.datasourceRequest({
url: '/api/tsdb/query',
method: 'POST',
data: {
queries: [queryDef],
}
})
.then(response => {
let results = response.data.results;
if (results['A']) {
return results['A'].series;
} else {
return null;
}
});
}
}
///////////////////////////////////////////////////////////////////////////////
function convertGrafanaTSResponse(time_series, items, addHostName) {
var hosts = _.uniqBy(_.flatten(_.map(items, 'hosts')), 'hostid'); //uniqBy is needed to deduplicate
let grafanaSeries = _.map(time_series, series => {
let itemid = series.name;
var item = _.find(items, {'itemid': itemid});
var alias = item.name;
if (_.keys(hosts).length > 1 && addHostName) { //only when actual multi hosts selected
var host = _.find(hosts, {'hostid': item.hostid});
alias = host.name + ": " + alias;
}
// CachingProxy deduplicates requests and returns one time series for equal queries.
// Clone is needed to prevent changing of series object shared between all targets.
let datapoints = _.cloneDeep(series.points);
return {
target: alias,
datapoints: datapoints
};
});
return _.sortBy(grafanaSeries, 'target');
}
function compactSQLQuery(query) {
return query.replace(/\s+/g, ' ');
}
function buildSQLHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, dialect = 'mysql') {
if (dialect === 'postgres') {
return buildPostgresHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction);
} else {
return buildMysqlHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction);
}
}
function buildSQLTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn, dialect = 'mysql') {
if (dialect === 'postgres') {
return buildPostgresTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn);
} else {
return buildMysqlTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn);
}
}
///////////
// MySQL //
///////////
function buildMysqlHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
let time_expression = `clock DIV ${intervalSec} * ${intervalSec}`;
let query = `
SELECT CAST(itemid AS CHAR) AS metric, ${time_expression} AS time_sec, ${aggFunction}(value) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY ${time_expression}, metric
ORDER BY time_sec ASC
`;
return query;
}
function buildMysqlTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) {
let time_expression = `clock DIV ${intervalSec} * ${intervalSec}`;
let query = `
SELECT CAST(itemid AS CHAR) AS metric, ${time_expression} AS time_sec, ${aggFunction}(${valueColumn}) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY ${time_expression}, metric
ORDER BY time_sec ASC
`;
return query;
}
const TEST_MYSQL_QUERY = `SELECT CAST(itemid AS CHAR) AS metric, clock AS time_sec, value_avg AS value FROM trends_uint LIMIT 1`;
////////////////
// PostgreSQL //
////////////////
const itemid_format = 'FM99999999999999999999';
function buildPostgresHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
let time_expression = `clock / ${intervalSec} * ${intervalSec}`;
let query = `
SELECT to_char(itemid, '${itemid_format}') AS metric, ${time_expression} AS time, ${aggFunction}(value) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY 1, 2
ORDER BY time ASC
`;
return query;
}
function buildPostgresTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) {
let time_expression = `clock / ${intervalSec} * ${intervalSec}`;
let query = `
SELECT to_char(itemid, '${itemid_format}') AS metric, ${time_expression} AS time, ${aggFunction}(${valueColumn}) AS value
FROM ${table}
WHERE itemid IN (${itemids})
AND clock > ${timeFrom} AND clock < ${timeTill}
GROUP BY 1, 2
ORDER BY time ASC
`;
return query;
}
const TEST_POSTGRES_QUERY = `
SELECT to_char(itemid, '${itemid_format}') AS metric, clock AS time, value_avg AS value
FROM trends_uint LIMIT 1
`;

View File

@@ -1,46 +0,0 @@
/**
* Base class for all Zabbix connectors
*/
export default class ZabbixConnector {
constructor() {}
testDataSource() {}
getHistory() {}
getTrends() {}
getGroups() {}
getHosts() {}
getApps() {}
getItems() {}
getItemsByIDs() {}
getMacros() {}
getGlobalMacros() {}
getLastValue() {}
getTriggers() {}
getEvents() {}
getAlerts() {}
getHostAlerts() {}
getAcknowledges() {}
getITService() {}
getSLA() {}
getVersion() {}
}

View File

@@ -2,7 +2,7 @@ import _ from 'lodash';
import * as utils from '../utils';
import responseHandler from '../responseHandler';
import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector';
import { ZabbixDBConnector } from './connectors/sql/zabbixDBConnector';
import { SQLConnector } from './connectors/sql/sqlConnector';
import { CachingProxy } from './proxy/cachingProxy';
const REQUESTS_TO_PROXYFY = [
@@ -20,8 +20,6 @@ const REQUESTS_TO_BIND = [
];
export class Zabbix {
/** @ngInject */
constructor(options, backendSrv, datasourceSrv) {
let {
url,
@@ -31,7 +29,7 @@ export class Zabbix {
withCredentials,
cacheTTL,
enableDirectDBConnection,
sqlDatasourceId
datasourceId
} = options;
this.enableDirectDBConnection = enableDirectDBConnection;
@@ -46,7 +44,8 @@ export class Zabbix {
this.zabbixAPI = new ZabbixAPIConnector(url, username, password, basicAuth, withCredentials, backendSrv);
if (enableDirectDBConnection) {
this.dbConnector = new ZabbixDBConnector(sqlDatasourceId, {}, backendSrv, datasourceSrv);
let dbConnectorOptions = { datasourceId };
this.dbConnector = new SQLConnector(dbConnectorOptions, backendSrv, datasourceSrv);
this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector);
this.getTrendsDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getTrends, 'getTrends', this.dbConnector);
}