Fix the way we merge frontend and backend queries (#2158)
## Summary With the new logic to merge backend and frontend query responses, some of the frontend query responses were being lost due to early termination. Merging frontend and backend responses was not properly waiting for all promises to resolve before returning this "merged" result ## Detailed explanation In `mergeQueries` although `Promise.all` was triggered, we returned immediately without waiting for the promises to actually resolve, now instead of passing down promises: - use `switchMap` so that the upstream backend response observable can be used as an inner Observable value to be concatenated with the rest of the responses - Why not use `map`? To prevent out of sync results, we need to wait for the promises to resolve **before** we pass it down to the `mergeQueries` function, `map` does not allow that. - use `from` and trigger `Promise.all` **before** calling `mergeQueries` so that all promise results can be resolved and converted to observables by the time they get passed into the function - modify `mergeQueries` to accept `DataResponse` for arguments, clone the existing data and return the merged result. <img width="1700" height="398" alt="Screenshot 2025-12-29 at 1 06 05 PM" src="https://github.com/user-attachments/assets/c07c59b1-43b8-47f9-adc6-67583b125856" /> ## Why To fix how frontend and backend queries are merged so that no response gets lost in the process ## How to test 1. Create a new dashboard or go to Explore 2. For query type select `Problems` or anything that is not `Metrics` 3. Execute the query, and you should be able to see a response.
This commit is contained in:
committed by
GitHub
parent
4eb55efa59
commit
3d0895c008
@@ -35,7 +35,7 @@ import {
|
||||
} from '@grafana/data';
|
||||
import { AnnotationQueryEditor } from './components/AnnotationQueryEditor';
|
||||
import { trackRequest } from './tracking';
|
||||
import { lastValueFrom, map, Observable } from 'rxjs';
|
||||
import { from, lastValueFrom, map, Observable, switchMap } from 'rxjs';
|
||||
|
||||
export class ZabbixDatasource extends DataSourceWithBackend<ZabbixMetricsQuery, ZabbixDSOptions> {
|
||||
name: string;
|
||||
@@ -142,10 +142,7 @@ export class ZabbixDatasource extends DataSourceWithBackend<ZabbixMetricsQuery,
|
||||
const backendResponse = super.query({ ...request, targets: interpolatedTargets.filter(this.isBackendTarget) });
|
||||
const dbConnectionResponsePromise = this.dbConnectionQuery({ ...request, targets: interpolatedTargets });
|
||||
const frontendResponsePromise = this.frontendQuery({ ...request, targets: interpolatedTargets });
|
||||
const annotationResposePromise = this.annotationRequest({ ...request, targets: interpolatedTargets });
|
||||
|
||||
const applyMergeQueries = (queryResponse: DataQueryResponse) =>
|
||||
this.mergeQueries(queryResponse, dbConnectionResponsePromise, frontendResponsePromise, annotationResposePromise);
|
||||
const annotationResponsePromise = this.annotationRequest({ ...request, targets: interpolatedTargets });
|
||||
const applyFEFuncs = (queryResponse: DataQueryResponse) =>
|
||||
this.applyFrontendFunctions(queryResponse, {
|
||||
...request,
|
||||
@@ -156,7 +153,13 @@ export class ZabbixDatasource extends DataSourceWithBackend<ZabbixMetricsQuery,
|
||||
map(applyFEFuncs),
|
||||
map(responseHandler.convertZabbixUnits),
|
||||
map(this.convertToWide),
|
||||
map(applyMergeQueries)
|
||||
switchMap((queryResponse) =>
|
||||
from(Promise.all([dbConnectionResponsePromise, frontendResponsePromise, annotationResponsePromise])).pipe(
|
||||
map(([dbConnectionRes, frontendRes, annotationRes]) =>
|
||||
this.mergeQueries(queryResponse, dbConnectionRes, frontendRes, annotationRes)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -906,24 +909,26 @@ export class ZabbixDatasource extends DataSourceWithBackend<ZabbixMetricsQuery,
|
||||
|
||||
mergeQueries(
|
||||
queryResponse: DataQueryResponse,
|
||||
dbConnectionResponsePromise: Promise<DataQueryResponse>,
|
||||
frontendResponsePromise: Promise<DataQueryResponse>,
|
||||
annotationResposePromise: Promise<DataQueryResponse>
|
||||
dbConnectionResponse?: DataQueryResponse,
|
||||
frontendResponse?: DataQueryResponse,
|
||||
annotationResponse?: DataQueryResponse
|
||||
): DataQueryResponse {
|
||||
Promise.all([dbConnectionResponsePromise, frontendResponsePromise, annotationResposePromise]).then((resp) => {
|
||||
const [dbConnectionRes, frontendRes, annotationRes] = resp;
|
||||
if (dbConnectionRes.data) {
|
||||
queryResponse.data = queryResponse.data.concat(dbConnectionRes.data);
|
||||
}
|
||||
if (frontendRes.data) {
|
||||
queryResponse.data = queryResponse.data.concat(frontendRes.data);
|
||||
}
|
||||
const mergedResponse: DataQueryResponse = {
|
||||
...queryResponse,
|
||||
data: queryResponse.data ? [...queryResponse.data] : [],
|
||||
};
|
||||
|
||||
if (annotationRes.data) {
|
||||
queryResponse.data = queryResponse.data.concat(annotationRes.data);
|
||||
}
|
||||
});
|
||||
return queryResponse;
|
||||
if (dbConnectionResponse?.data) {
|
||||
mergedResponse.data = mergedResponse.data.concat(dbConnectionResponse.data);
|
||||
}
|
||||
if (frontendResponse?.data) {
|
||||
mergedResponse.data = mergedResponse.data.concat(frontendResponse.data);
|
||||
}
|
||||
if (annotationResponse?.data) {
|
||||
mergedResponse.data = mergedResponse.data.concat(annotationResponse.data);
|
||||
}
|
||||
|
||||
return mergedResponse;
|
||||
}
|
||||
|
||||
convertToWide(response: DataQueryResponse) {
|
||||
|
||||
149
src/datasource/specs/datasource.test.ts
Normal file
149
src/datasource/specs/datasource.test.ts
Normal file
@@ -0,0 +1,149 @@
|
||||
import { lastValueFrom, of } from 'rxjs';
|
||||
import { ZabbixDatasource } from '../datasource';
|
||||
import * as c from '../constants';
|
||||
import { DataSourceWithBackend } from '@grafana/runtime';
|
||||
|
||||
const buildRequest = () =>
|
||||
({
|
||||
targets: [{ refId: 'A', queryType: c.MODE_METRICS }],
|
||||
range: { from: 'now-1h', to: 'now' },
|
||||
scopedVars: {},
|
||||
}) as any;
|
||||
|
||||
const createDeferred = <T>() => {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
const promise = new Promise<T>((res) => {
|
||||
resolve = res;
|
||||
});
|
||||
return { promise, resolve };
|
||||
};
|
||||
|
||||
jest.mock('../tracking', () => ({
|
||||
trackRequest: jest.fn(),
|
||||
}));
|
||||
|
||||
jest.mock('../responseHandler', () => ({
|
||||
__esModule: true,
|
||||
default: {
|
||||
convertZabbixUnits: (resp: any) => resp,
|
||||
convertToWide: (data: any) => data,
|
||||
isConvertibleToWide: () => false,
|
||||
},
|
||||
}));
|
||||
|
||||
jest.mock('../zabbix/zabbix', () => ({
|
||||
Zabbix: jest.fn().mockImplementation(() => ({})),
|
||||
}));
|
||||
|
||||
jest.mock('grafana/app/core/config', () => ({
|
||||
buildInfo: { env: 'development' },
|
||||
}));
|
||||
|
||||
jest.mock('grafana/app/core/core', () => ({
|
||||
contextSrv: {},
|
||||
}));
|
||||
|
||||
jest.mock('grafana/app/core/utils/datemath', () => ({
|
||||
parse: () => Date.now(),
|
||||
}));
|
||||
|
||||
jest.mock('@grafana/runtime', () => {
|
||||
const { of } = require('rxjs');
|
||||
|
||||
class MockDataSourceWithBackend {
|
||||
instanceSettings: any;
|
||||
constructor(settings: any) {
|
||||
this.instanceSettings = settings;
|
||||
}
|
||||
|
||||
query() {
|
||||
return of({ data: [] });
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
DataSourceWithBackend: MockDataSourceWithBackend,
|
||||
getTemplateSrv: jest.fn(() => ({
|
||||
replace: (value: any) => value,
|
||||
variableExists: () => false,
|
||||
})),
|
||||
getDataSourceSrv: jest.fn(() => ({
|
||||
getInstanceSettings: () => undefined,
|
||||
})),
|
||||
getBackendSrv: jest.fn(),
|
||||
HealthCheckError: class {},
|
||||
TemplateSrv: class {},
|
||||
};
|
||||
});
|
||||
|
||||
describe('ZabbixDatasource', () => {
|
||||
const instanceSettings: any = { id: 1, name: 'test-ds', jsonData: {} };
|
||||
const ds = new ZabbixDatasource(instanceSettings);
|
||||
it('waits for all non-backend responses before emitting merged data', async () => {
|
||||
jest.spyOn(ZabbixDatasource.prototype, 'interpolateVariablesInQueries').mockReturnValue(buildRequest().targets);
|
||||
jest.spyOn(ds, 'applyFrontendFunctions').mockImplementation((response) => response);
|
||||
|
||||
jest.spyOn(DataSourceWithBackend.prototype, 'query').mockReturnValue(of({ data: [{ refId: 'A' }] as any[] }));
|
||||
|
||||
const dbDeferred = createDeferred<any>();
|
||||
const frontendDeferred = createDeferred<any>();
|
||||
const annotationDeferred = createDeferred<any>();
|
||||
|
||||
jest.spyOn(ds, 'dbConnectionQuery').mockReturnValue(dbDeferred.promise);
|
||||
jest.spyOn(ds, 'frontendQuery').mockReturnValue(frontendDeferred.promise);
|
||||
jest.spyOn(ds, 'annotationRequest').mockReturnValue(annotationDeferred.promise);
|
||||
|
||||
const request = buildRequest();
|
||||
let settled = false;
|
||||
const resultPromise = lastValueFrom(ds.query(request)).then((res) => {
|
||||
settled = true;
|
||||
return res;
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
expect(settled).toBe(false);
|
||||
|
||||
dbDeferred.resolve({ data: [{ refId: 'B' }] });
|
||||
frontendDeferred.resolve({ data: [{ refId: 'C' }] });
|
||||
annotationDeferred.resolve({ data: [{ refId: 'D' }] });
|
||||
|
||||
const result = await resultPromise;
|
||||
expect(result.data).toEqual([{ refId: 'A' }, { refId: 'B' }, { refId: 'C' }, { refId: 'D' }]);
|
||||
});
|
||||
|
||||
it('mergeQueries combines data without mutating the original response', () => {
|
||||
const baseResponse = { data: [{ refId: 'A' }] } as any;
|
||||
const merged = ds.mergeQueries(
|
||||
baseResponse,
|
||||
{ data: [{ refId: 'B' }] } as any,
|
||||
{ data: [{ refId: 'C' }] } as any,
|
||||
{ data: [{ refId: 'D' }] } as any
|
||||
);
|
||||
|
||||
expect(merged.data).toEqual([{ refId: 'A' }, { refId: 'B' }, { refId: 'C' }, { refId: 'D' }]);
|
||||
expect(baseResponse.data).toEqual([{ refId: 'A' }]);
|
||||
});
|
||||
|
||||
it('convertToWide delegates when data is convertible', () => {
|
||||
const ds = new ZabbixDatasource(instanceSettings);
|
||||
const response = { data: ['narrow'] } as any;
|
||||
const result = ds.convertToWide(response);
|
||||
expect(result.data).toEqual(['narrow']);
|
||||
});
|
||||
|
||||
it('detects backend vs DB connection targets based on flag', () => {
|
||||
const ds = new ZabbixDatasource(instanceSettings);
|
||||
const metricsTarget = { queryType: c.MODE_METRICS } as any;
|
||||
const itemIdTarget = { queryType: c.MODE_ITEMID } as any;
|
||||
const problemsTarget = { queryType: c.MODE_PROBLEMS } as any;
|
||||
|
||||
expect(ds.isBackendTarget(metricsTarget)).toBe(true);
|
||||
expect(ds.isBackendTarget(itemIdTarget)).toBe(true);
|
||||
expect(ds.isBackendTarget(problemsTarget)).toBe(false);
|
||||
expect(ds.isDBConnectionTarget(metricsTarget)).toBe(false);
|
||||
|
||||
ds.enableDirectDBConnection = true;
|
||||
expect(ds.isBackendTarget(metricsTarget)).toBe(false);
|
||||
expect(ds.isDBConnectionTarget(metricsTarget)).toBe(true);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user