influx: support retention policy for long-term stored data

This commit is contained in:
Alexander Zobnin
2018-10-31 20:56:36 +03:00
parent d5a224d4fc
commit 089700d227
6 changed files with 83 additions and 34 deletions

View File

@@ -5,6 +5,7 @@ import { DBConnector, HISTORY_TO_TABLE_MAP, consolidateByFunc } from '../dbConne
export class InfluxDBConnector extends DBConnector {
constructor(options, datasourceSrv) {
super(options, datasourceSrv);
this.retentionPolicy = options.retentionPolicy;
super.loadDBDataSource().then(ds => {
this.influxDS = ds;
return ds;
@@ -19,9 +20,10 @@ export class InfluxDBConnector extends DBConnector {
}
getHistory(items, timeFrom, timeTill, options) {
let {intervalMs, consolidateBy} = options;
let { intervalMs, consolidateBy, retentionPolicy } = options;
const intervalSec = Math.ceil(intervalMs / 1000);
const range = { timeFrom, timeTill };
consolidateBy = consolidateBy || 'avg';
const aggFunction = consolidateByFunc[consolidateBy] || consolidateBy;
@@ -30,7 +32,7 @@ export class InfluxDBConnector extends DBConnector {
const promises = _.map(grouped_items, (items, value_type) => {
const itemids = _.map(items, 'itemid');
const table = HISTORY_TO_TABLE_MAP[value_type];
const query = this.buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction);
const query = this.buildHistoryQuery(itemids, table, range, intervalSec, aggFunction, retentionPolicy);
return this.invokeInfluxDBQuery(query);
});
@@ -42,13 +44,16 @@ export class InfluxDBConnector extends DBConnector {
}
getTrends(items, timeFrom, timeTill, options) {
options.retentionPolicy = this.retentionPolicy;
return this.getHistory(items, timeFrom, timeTill, options);
}
buildHistoryQuery(itemids, table, timeFrom, timeTill, intervalSec, aggFunction) {
buildHistoryQuery(itemids, table, range, intervalSec, aggFunction, retentionPolicy) {
const { timeFrom, timeTill } = range;
const measurement = retentionPolicy ? `"${retentionPolicy}"."${table}"` : `"${table}"`;
const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction;
const where_clause = this.buildWhereClause(itemids);
const query = `SELECT ${AGG}("value") FROM "${table}"
const query = `SELECT ${AGG}("value") FROM ${measurement}
WHERE ${where_clause} AND "time" >= ${timeFrom}s AND "time" <= ${timeTill}s
GROUP BY time(${intervalSec}s), "itemid" fill(none)`;
return compactQuery(query);

View File

@@ -37,6 +37,7 @@ export class Zabbix {
enableDirectDBConnection,
dbConnectionDatasourceId,
dbConnectionDatasourceName,
dbConnectionRetentionPolicy,
} = options;
this.enableDirectDBConnection = enableDirectDBConnection;
@@ -55,7 +56,8 @@ export class Zabbix {
this.bindRequests();
if (enableDirectDBConnection) {
this.initDBConnector(dbConnectionDatasourceId, dbConnectionDatasourceName, datasourceSrv)
const connectorOptions = { dbConnectionRetentionPolicy };
this.initDBConnector(dbConnectionDatasourceId, dbConnectionDatasourceName, datasourceSrv, connectorOptions)
.then(() => {
this.getHistoryDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getHistory, 'getHistory', this.dbConnector);
this.getTrendsDB = this.cachingProxy.proxyfyWithCache(this.dbConnector.getTrends, 'getTrends', this.dbConnector);
@@ -63,14 +65,15 @@ export class Zabbix {
}
}
initDBConnector(datasourceId, datasourceName, datasourceSrv) {
initDBConnector(datasourceId, datasourceName, datasourceSrv, options) {
return DBConnector.loadDatasource(datasourceId, datasourceName, datasourceSrv)
.then(ds => {
const options = { datasourceId, datasourceName };
let connectorOptions = { datasourceId, datasourceName };
if (ds.type === 'influxdb') {
this.dbConnector = new InfluxDBConnector(options, datasourceSrv);
connectorOptions.retentionPolicy = options.dbConnectionRetentionPolicy;
this.dbConnector = new InfluxDBConnector(connectorOptions, datasourceSrv);
} else {
this.dbConnector = new SQLConnector(options, datasourceSrv);
this.dbConnector = new SQLConnector(connectorOptions, datasourceSrv);
}
return this.dbConnector;
});