influxdb: fix trends query, closes #675
This commit is contained in:
@@ -34,7 +34,7 @@ describe('InfluxDBConnector', () => {
|
|||||||
|
|
||||||
it('should use MEAN instead of AVG', () => {
|
it('should use MEAN instead of AVG', () => {
|
||||||
const { itemids, range, intervalSec, table } = ctx.defaultQueryParams;
|
const { itemids, range, intervalSec, table } = ctx.defaultQueryParams;
|
||||||
const aggFunction = 'AVG';
|
const aggFunction = 'avg';
|
||||||
const query = ctx.influxDBConnector.buildHistoryQuery(itemids, table, range, intervalSec, aggFunction);
|
const query = ctx.influxDBConnector.buildHistoryQuery(itemids, table, range, intervalSec, aggFunction);
|
||||||
const expected = compactQuery(`SELECT MEAN("value")
|
const expected = compactQuery(`SELECT MEAN("value")
|
||||||
FROM "history" WHERE ("itemid" = '123' OR "itemid" = '234') AND "time" >= 15000s AND "time" <= 15100s
|
FROM "history" WHERE ("itemid" = '123' OR "itemid" = '234') AND "time" >= 15000s AND "time" <= 15100s
|
||||||
@@ -100,7 +100,21 @@ describe('InfluxDBConnector', () => {
|
|||||||
const items = [
|
const items = [
|
||||||
{ itemid: '123', value_type: 3 }
|
{ itemid: '123', value_type: 3 }
|
||||||
];
|
];
|
||||||
const expectedQuery = compactQuery(`SELECT MEAN("value")
|
const expectedQuery = compactQuery(`SELECT MEAN("value_avg")
|
||||||
|
FROM "longterm"."history_uint" WHERE ("itemid" = '123') AND "time" >= 15000s AND "time" <= 15100s
|
||||||
|
GROUP BY time(5s), "itemid" fill(none)
|
||||||
|
`);
|
||||||
|
ctx.influxDBConnector.getTrends(items, timeFrom, timeTill, options);
|
||||||
|
expect(ctx.influxDBConnector.invokeInfluxDBQuery).toHaveBeenCalledWith(expectedQuery);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use proper value column if retention policy set (trends used)', () => {
|
||||||
|
const { timeFrom, timeTill } = ctx.defaultQueryParams.range;
|
||||||
|
const options = { intervalMs: 5000, consolidateBy: 'max' };
|
||||||
|
const items = [
|
||||||
|
{ itemid: '123', value_type: 3 }
|
||||||
|
];
|
||||||
|
const expectedQuery = compactQuery(`SELECT MAX("value_max")
|
||||||
FROM "longterm"."history_uint" WHERE ("itemid" = '123') AND "time" >= 15000s AND "time" <= 15100s
|
FROM "longterm"."history_uint" WHERE ("itemid" = '123') AND "time" >= 15000s AND "time" <= 15100s
|
||||||
GROUP BY time(5s), "itemid" fill(none)
|
GROUP BY time(5s), "itemid" fill(none)
|
||||||
`);
|
`);
|
||||||
|
|||||||
@@ -1,6 +1,14 @@
|
|||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
import { compactQuery } from '../../../utils';
|
import { compactQuery } from '../../../utils';
|
||||||
import { DBConnector, HISTORY_TO_TABLE_MAP, consolidateByFunc } from '../dbConnector';
|
import { DBConnector, HISTORY_TO_TABLE_MAP, consolidateByTrendColumns } from '../dbConnector';
|
||||||
|
|
||||||
|
const consolidateByFunc = {
|
||||||
|
'avg': 'MEAN',
|
||||||
|
'min': 'MIN',
|
||||||
|
'max': 'MAX',
|
||||||
|
'sum': 'SUM',
|
||||||
|
'count': 'COUNT'
|
||||||
|
};
|
||||||
|
|
||||||
export class InfluxDBConnector extends DBConnector {
|
export class InfluxDBConnector extends DBConnector {
|
||||||
constructor(options, datasourceSrv) {
|
constructor(options, datasourceSrv) {
|
||||||
@@ -25,14 +33,13 @@ export class InfluxDBConnector extends DBConnector {
|
|||||||
|
|
||||||
const range = { timeFrom, timeTill };
|
const range = { timeFrom, timeTill };
|
||||||
consolidateBy = consolidateBy || 'avg';
|
consolidateBy = consolidateBy || 'avg';
|
||||||
const aggFunction = consolidateByFunc[consolidateBy] || consolidateBy;
|
|
||||||
|
|
||||||
// Group items by value type and perform request for each value type
|
// Group items by value type and perform request for each value type
|
||||||
const grouped_items = _.groupBy(items, 'value_type');
|
const grouped_items = _.groupBy(items, 'value_type');
|
||||||
const promises = _.map(grouped_items, (items, value_type) => {
|
const promises = _.map(grouped_items, (items, value_type) => {
|
||||||
const itemids = _.map(items, 'itemid');
|
const itemids = _.map(items, 'itemid');
|
||||||
const table = HISTORY_TO_TABLE_MAP[value_type];
|
const table = HISTORY_TO_TABLE_MAP[value_type];
|
||||||
const query = this.buildHistoryQuery(itemids, table, range, intervalSec, aggFunction, retentionPolicy);
|
const query = this.buildHistoryQuery(itemids, table, range, intervalSec, consolidateBy, retentionPolicy);
|
||||||
return this.invokeInfluxDBQuery(query);
|
return this.invokeInfluxDBQuery(query);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -51,9 +58,13 @@ export class InfluxDBConnector extends DBConnector {
|
|||||||
buildHistoryQuery(itemids, table, range, intervalSec, aggFunction, retentionPolicy) {
|
buildHistoryQuery(itemids, table, range, intervalSec, aggFunction, retentionPolicy) {
|
||||||
const { timeFrom, timeTill } = range;
|
const { timeFrom, timeTill } = range;
|
||||||
const measurement = retentionPolicy ? `"${retentionPolicy}"."${table}"` : `"${table}"`;
|
const measurement = retentionPolicy ? `"${retentionPolicy}"."${table}"` : `"${table}"`;
|
||||||
const AGG = aggFunction === 'AVG' ? 'MEAN' : aggFunction;
|
let value = 'value';
|
||||||
|
if (retentionPolicy) {
|
||||||
|
value = consolidateByTrendColumns[aggFunction] || 'value_avg';
|
||||||
|
}
|
||||||
|
const aggregation = consolidateByFunc[aggFunction] || aggFunction;
|
||||||
const where_clause = this.buildWhereClause(itemids);
|
const where_clause = this.buildWhereClause(itemids);
|
||||||
const query = `SELECT ${AGG}("value") FROM ${measurement}
|
const query = `SELECT ${aggregation}("${value}") FROM ${measurement}
|
||||||
WHERE ${where_clause} AND "time" >= ${timeFrom}s AND "time" <= ${timeTill}s
|
WHERE ${where_clause} AND "time" >= ${timeFrom}s AND "time" <= ${timeTill}s
|
||||||
GROUP BY time(${intervalSec}s), "itemid" fill(none)`;
|
GROUP BY time(${intervalSec}s), "itemid" fill(none)`;
|
||||||
return compactQuery(query);
|
return compactQuery(query);
|
||||||
|
|||||||
Reference in New Issue
Block a user