db connector: refactor

This commit is contained in:
Alexander Zobnin
2018-10-30 19:58:48 +03:00
parent 6da2479e9d
commit 6eb52619b9
6 changed files with 64 additions and 113 deletions

View File

@@ -1,12 +1,40 @@
import _ from 'lodash';
export const DEFAULT_QUERY_LIMIT = 10000;
export const HISTORY_TO_TABLE_MAP = {
'0': 'history',
'1': 'history_str',
'2': 'history_log',
'3': 'history_uint',
'4': 'history_text'
};
export const TREND_TO_TABLE_MAP = {
'0': 'trends',
'3': 'trends_uint'
};
export const consolidateByFunc = {
'avg': 'AVG',
'min': 'MIN',
'max': 'MAX',
'sum': 'SUM',
'count': 'COUNT'
};
export const consolidateByTrendColumns = {
'avg': 'value_avg',
'min': 'value_min',
'max': 'value_max',
'sum': 'num*value_avg' // sum of sums inside the one-hour trend period
};
/**
* Base class for external history database connectors. Subclasses should implement `getHistory()`, `getTrends()` and
* `testDataSource()` methods, which describe how to fetch data from source other than Zabbix API.
*/
export default class DBConnector {
constructor(options, backendSrv, datasourceSrv) {
this.backendSrv = backendSrv;
export class DBConnector {
constructor(options, datasourceSrv) {
this.datasourceSrv = datasourceSrv;
this.datasourceId = options.datasourceId;
this.datasourceName = options.datasourceName;
@@ -106,3 +134,14 @@ function convertGrafanaTSResponse(time_series, items, addHostName) {
return _.sortBy(grafanaSeries, 'target');
}
const defaults = {
DBConnector,
DEFAULT_QUERY_LIMIT,
HISTORY_TO_TABLE_MAP,
TREND_TO_TABLE_MAP,
consolidateByFunc,
consolidateByTrendColumns
};
export default defaults;

View File

@@ -1,38 +1,9 @@
import _ from 'lodash';
import DBConnector from '../dbConnector';
const DEFAULT_QUERY_LIMIT = 10000;
const HISTORY_TO_TABLE_MAP = {
'0': 'history',
'1': 'history_str',
'2': 'history_log',
'3': 'history_uint',
'4': 'history_text'
};
const TREND_TO_TABLE_MAP = {
'0': 'trends',
'3': 'trends_uint'
};
const consolidateByFunc = {
'avg': 'AVG',
'min': 'MIN',
'max': 'MAX',
'sum': 'SUM',
'count': 'COUNT'
};
const consolidateByTrendColumns = {
'avg': 'value_avg',
'min': 'value_min',
'max': 'value_max',
'sum': 'num*value_avg' // sum of sums inside the one-hour trend period
};
import { DBConnector, DEFAULT_QUERY_LIMIT, HISTORY_TO_TABLE_MAP, consolidateByFunc } from '../dbConnector';
export class InfluxDBConnector extends DBConnector {
constructor(options, backendSrv, datasourceSrv) {
super(options, backendSrv, datasourceSrv);
constructor(options, datasourceSrv) {
super(options, datasourceSrv);
this.limit = options.limit || DEFAULT_QUERY_LIMIT;
super.loadDBDataSource().then(ds => {
this.influxDS = ds;
@@ -71,28 +42,7 @@ export class InfluxDBConnector extends DBConnector {
}
getTrends(items, timeFrom, timeTill, options) {
let { intervalMs, consolidateBy } = options;
const intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
const aggFunction = consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
const grouped_items = _.groupBy(items, 'value_type');
const promises = _.map(grouped_items, (items, value_type) => {
const itemids = _.map(items, 'itemid');
const table = TREND_TO_TABLE_MAP[value_type];
let valueColumn = _.includes(['avg', 'min', 'max', 'sum'], consolidateBy) ? consolidateBy : 'avg';
valueColumn = consolidateByTrendColumns[valueColumn];
const query = this.buildTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn);
return this.invokeInfluxDBQuery(query);
});
return Promise.all(promises)
.then(_.flatten)
.then(results => {
return handleInfluxHistoryResponse(results);
});
return this.getHistory(items, timeFrom, timeTill, options);
}
buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
@@ -104,15 +54,6 @@ export class InfluxDBConnector extends DBConnector {
return compactQuery(query);
}
buildTrendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn) {
const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction;
const where_clause = this.buildWhereClause(itemids);
const query = `SELECT ${AGG}("${valueColumn}") FROM "${table}"
WHERE ${where_clause} AND "time" >= ${timeFrom}s AND "time" <= ${timeTill}s
GROUP BY time(${intervalSec}s)`;
return compactQuery(query);
}
buildWhereClause(itemids) {
const itemidsWhere = itemids.map(itemid => `"itemid" = '${itemid}'`).join(' OR ');
return `(${itemidsWhere})`;

View File

@@ -1,45 +1,17 @@
import _ from 'lodash';
import mysql from './mysql';
import postgres from './postgres';
import DBConnector from '../dbConnector';
import dbConnector, { DBConnector, DEFAULT_QUERY_LIMIT, HISTORY_TO_TABLE_MAP, TREND_TO_TABLE_MAP } from '../dbConnector';
const supportedDatabases = {
mysql: 'mysql',
postgres: 'postgres'
};
const DEFAULT_QUERY_LIMIT = 10000;
const HISTORY_TO_TABLE_MAP = {
'0': 'history',
'1': 'history_str',
'2': 'history_log',
'3': 'history_uint',
'4': 'history_text'
};
const TREND_TO_TABLE_MAP = {
'0': 'trends',
'3': 'trends_uint'
};
const consolidateByFunc = {
'avg': 'AVG',
'min': 'MIN',
'max': 'MAX',
'sum': 'SUM',
'count': 'COUNT'
};
const consolidateByTrendColumns = {
'avg': 'value_avg',
'min': 'value_min',
'max': 'value_max',
'sum': 'num*value_avg' // sum of sums inside the one-hour trend period
};
export class SQLConnector extends DBConnector {
constructor(options, backendSrv, datasourceSrv) {
super(options, backendSrv, datasourceSrv);
constructor(options, datasourceSrv, backendSrv) {
super(options, datasourceSrv);
this.backendSrv = backendSrv;
this.limit = options.limit || DEFAULT_QUERY_LIMIT;
this.sqlDialect = null;
@@ -69,7 +41,7 @@ export class SQLConnector extends DBConnector {
let intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
let aggFunction = consolidateByFunc[consolidateBy];
let aggFunction = dbConnector.consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
let grouped_items = _.groupBy(items, 'value_type');
@@ -92,7 +64,7 @@ export class SQLConnector extends DBConnector {
let intervalSec = Math.ceil(intervalMs / 1000);
consolidateBy = consolidateBy || 'avg';
let aggFunction = consolidateByFunc[consolidateBy];
let aggFunction = dbConnector.consolidateByFunc[consolidateBy];
// Group items by value type and perform request for each value type
let grouped_items = _.groupBy(items, 'value_type');
@@ -100,7 +72,7 @@ export class SQLConnector extends DBConnector {
let itemids = _.map(items, 'itemid').join(', ');
let table = TREND_TO_TABLE_MAP[value_type];
let valueColumn = _.includes(['avg', 'min', 'max', 'sum'], consolidateBy) ? consolidateBy : 'avg';
valueColumn = consolidateByTrendColumns[valueColumn];
valueColumn = dbConnector.consolidateByTrendColumns[valueColumn];
let query = this.sqlDialect.trendsQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction, valueColumn);
query = compactSQLQuery(query);

View File

@@ -1,7 +1,7 @@
import _ from 'lodash';
import * as utils from '../utils';
import responseHandler from '../responseHandler';
import DBConnector from './connectors/dbConnector';
import { DBConnector } from './connectors/dbConnector';
import { ZabbixAPIConnector } from './connectors/zabbix_api/zabbixAPIConnector';
import { SQLConnector } from './connectors/sql/sqlConnector';
import { InfluxDBConnector } from './connectors/influxdb/influxdbConnector';
@@ -25,7 +25,7 @@ const REQUESTS_TO_BIND = [
];
export class Zabbix {
constructor(options, backendSrv, datasourceSrv) {
constructor(options, datasourceSrv, backendSrv) {
let {
url,
username,
@@ -59,12 +59,12 @@ export class Zabbix {
datasourceId: dbConnectionDatasourceId,
datasourceName: dbConnectionDatasourceName
};
this.dbConnector = new DBConnector(dbConnectorOptions, backendSrv, datasourceSrv);
this.dbConnector = new DBConnector(dbConnectorOptions, datasourceSrv);
this.dbConnector.loadDBDataSource().then(ds => {
if (ds.type === 'influxdb') {
this.dbConnector = new InfluxDBConnector(dbConnectorOptions, backendSrv, datasourceSrv);
this.dbConnector = new InfluxDBConnector(dbConnectorOptions, datasourceSrv);
} else {
this.dbConnector = new SQLConnector(dbConnectorOptions, backendSrv, datasourceSrv);
this.dbConnector = new SQLConnector(dbConnectorOptions, datasourceSrv, backendSrv);
}
}).then(() => {
this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector);