From fcef21b0fbea411e574c4193405b05f7aa8b4710 Mon Sep 17 00:00:00 2001 From: Alexander Zobnin Date: Wed, 15 Sep 2021 16:06:53 +0300 Subject: [PATCH] Interpolate series to make stacking work properly in case of different intervals, #1211 --- pkg/datasource/zabbix.go | 2 +- pkg/timeseries/align.go | 78 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/pkg/datasource/zabbix.go b/pkg/datasource/zabbix.go index 0f1227c..cff7983 100644 --- a/pkg/datasource/zabbix.go +++ b/pkg/datasource/zabbix.go @@ -126,7 +126,7 @@ func (ds *ZabbixDatasourceInstance) applyDataProcessing(ctx context.Context, que } if len(series) > 1 { - series = timeseries.AlignSeriesIntervals(series) + series = timeseries.PrepareForStack(series) } } diff --git a/pkg/timeseries/align.go b/pkg/timeseries/align.go index 3ce46d5..8265252 100644 --- a/pkg/timeseries/align.go +++ b/pkg/timeseries/align.go @@ -81,6 +81,84 @@ func (ts TimeSeries) DetectInterval() time.Duration { return time.Duration(deltas[midIndex]) * time.Millisecond } +// PrepareForStack performs series interpolation to make series consist of the points with same time stamps +func PrepareForStack(series []*TimeSeriesData) []*TimeSeriesData { + if len(series) == 0 { + return series + } + + // Build unique set of time stamps from all series + interpolatedTimeStampsMap := make(map[time.Time]time.Time) + for _, s := range series { + for _, p := range s.TS { + interpolatedTimeStampsMap[p.Time] = p.Time + } + } + + // Convert to slice and sort + interpolatedTimeStamps := make([]time.Time, 0) + for _, ts := range interpolatedTimeStampsMap { + interpolatedTimeStamps = append(interpolatedTimeStamps, ts) + } + sort.Slice(interpolatedTimeStamps, func(i, j int) bool { + return interpolatedTimeStamps[i].Before(interpolatedTimeStamps[j]) + }) + + for _, s := range series { + if s.Len() < 2 { + continue + } + + p := s.TS[0] + pNext := s.TS[1] + interpolatedSeries := make([]TimePoint, 0) + interpolatedTS := interpolatedTimeStamps[0] + interpolatedTSIdx := 0 + + // Insert nulls before the first point + for i := 0; i < len(interpolatedTimeStamps); i++ { + interpolatedTS = interpolatedTimeStamps[i] + if interpolatedTS.Before(p.Time) { + interpolatedSeries = append(interpolatedSeries, TimePoint{Time: interpolatedTS, Value: nil}) + } else { + interpolatedTSIdx = i + break + } + } + + for i := 0; i < s.Len()-1; i++ { + p = s.TS[i] + pNext = s.TS[i+1] + + interpolatedSeries = append(interpolatedSeries, p) + + // Insert interpolated points between existing + for interpolatedTimeStamps[interpolatedTSIdx].Before(pNext.Time) && interpolatedTSIdx < len(interpolatedTimeStamps) { + if interpolatedTimeStamps[interpolatedTSIdx].Equal(p.Time) { + interpolatedTSIdx++ + continue + } + + frameTs := interpolatedTimeStamps[interpolatedTSIdx] + if p.Value != nil && pNext.Value != nil { + pointValue := linearInterpolation(frameTs, p, pNext) + interpolatedSeries = append(interpolatedSeries, TimePoint{Time: frameTs, Value: &pointValue}) + } else { + // Next or current point is null means we're currently in a gap between 2 points, + // so put nulls instead of interpolating values. + interpolatedSeries = append(interpolatedSeries, TimePoint{Time: frameTs, Value: nil}) + } + interpolatedTSIdx++ + } + } + + interpolatedSeries = append(interpolatedSeries, pNext) + s.TS = interpolatedSeries + } + + return series +} + // AlignSeriesIntervals aligns series to the same time interval func AlignSeriesIntervals(series []*TimeSeriesData) []*TimeSeriesData { if len(series) == 0 {