Interpolate series to make stacking work properly in case of different intervals, #1211
This commit is contained in:
@@ -126,7 +126,7 @@ func (ds *ZabbixDatasourceInstance) applyDataProcessing(ctx context.Context, que
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(series) > 1 {
|
if len(series) > 1 {
|
||||||
series = timeseries.AlignSeriesIntervals(series)
|
series = timeseries.PrepareForStack(series)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,84 @@ func (ts TimeSeries) DetectInterval() time.Duration {
|
|||||||
return time.Duration(deltas[midIndex]) * time.Millisecond
|
return time.Duration(deltas[midIndex]) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrepareForStack performs series interpolation to make series consist of the points with same time stamps
|
||||||
|
func PrepareForStack(series []*TimeSeriesData) []*TimeSeriesData {
|
||||||
|
if len(series) == 0 {
|
||||||
|
return series
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build unique set of time stamps from all series
|
||||||
|
interpolatedTimeStampsMap := make(map[time.Time]time.Time)
|
||||||
|
for _, s := range series {
|
||||||
|
for _, p := range s.TS {
|
||||||
|
interpolatedTimeStampsMap[p.Time] = p.Time
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert to slice and sort
|
||||||
|
interpolatedTimeStamps := make([]time.Time, 0)
|
||||||
|
for _, ts := range interpolatedTimeStampsMap {
|
||||||
|
interpolatedTimeStamps = append(interpolatedTimeStamps, ts)
|
||||||
|
}
|
||||||
|
sort.Slice(interpolatedTimeStamps, func(i, j int) bool {
|
||||||
|
return interpolatedTimeStamps[i].Before(interpolatedTimeStamps[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, s := range series {
|
||||||
|
if s.Len() < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
p := s.TS[0]
|
||||||
|
pNext := s.TS[1]
|
||||||
|
interpolatedSeries := make([]TimePoint, 0)
|
||||||
|
interpolatedTS := interpolatedTimeStamps[0]
|
||||||
|
interpolatedTSIdx := 0
|
||||||
|
|
||||||
|
// Insert nulls before the first point
|
||||||
|
for i := 0; i < len(interpolatedTimeStamps); i++ {
|
||||||
|
interpolatedTS = interpolatedTimeStamps[i]
|
||||||
|
if interpolatedTS.Before(p.Time) {
|
||||||
|
interpolatedSeries = append(interpolatedSeries, TimePoint{Time: interpolatedTS, Value: nil})
|
||||||
|
} else {
|
||||||
|
interpolatedTSIdx = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < s.Len()-1; i++ {
|
||||||
|
p = s.TS[i]
|
||||||
|
pNext = s.TS[i+1]
|
||||||
|
|
||||||
|
interpolatedSeries = append(interpolatedSeries, p)
|
||||||
|
|
||||||
|
// Insert interpolated points between existing
|
||||||
|
for interpolatedTimeStamps[interpolatedTSIdx].Before(pNext.Time) && interpolatedTSIdx < len(interpolatedTimeStamps) {
|
||||||
|
if interpolatedTimeStamps[interpolatedTSIdx].Equal(p.Time) {
|
||||||
|
interpolatedTSIdx++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
frameTs := interpolatedTimeStamps[interpolatedTSIdx]
|
||||||
|
if p.Value != nil && pNext.Value != nil {
|
||||||
|
pointValue := linearInterpolation(frameTs, p, pNext)
|
||||||
|
interpolatedSeries = append(interpolatedSeries, TimePoint{Time: frameTs, Value: &pointValue})
|
||||||
|
} else {
|
||||||
|
// Next or current point is null means we're currently in a gap between 2 points,
|
||||||
|
// so put nulls instead of interpolating values.
|
||||||
|
interpolatedSeries = append(interpolatedSeries, TimePoint{Time: frameTs, Value: nil})
|
||||||
|
}
|
||||||
|
interpolatedTSIdx++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interpolatedSeries = append(interpolatedSeries, pNext)
|
||||||
|
s.TS = interpolatedSeries
|
||||||
|
}
|
||||||
|
|
||||||
|
return series
|
||||||
|
}
|
||||||
|
|
||||||
// AlignSeriesIntervals aligns series to the same time interval
|
// AlignSeriesIntervals aligns series to the same time interval
|
||||||
func AlignSeriesIntervals(series []*TimeSeriesData) []*TimeSeriesData {
|
func AlignSeriesIntervals(series []*TimeSeriesData) []*TimeSeriesData {
|
||||||
if len(series) == 0 {
|
if len(series) == 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user