From b07a2265748e404c835a5c6b1d0879365cd04751 Mon Sep 17 00:00:00 2001 From: Alexander Zobnin Date: Wed, 26 May 2021 17:35:01 +0300 Subject: [PATCH] Implement sumSeries --- pkg/datasource/functions.go | 37 ++++++---- pkg/timeseries/timeseries.go | 132 +++++++++++++++++++++++++++++++++-- 2 files changed, 147 insertions(+), 22 deletions(-) diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go index 4d345cf..8771ff6 100644 --- a/pkg/datasource/functions.go +++ b/pkg/datasource/functions.go @@ -35,6 +35,7 @@ func init() { aggFuncMap = map[string]AggDataProcessingFunc{ "aggregateBy": applyAggregateBy, + "sumSeries": applySumSeries, } // Functions processing on the frontend @@ -84,21 +85,6 @@ func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.Ti return s, nil } -func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { - pInterval := params[0] - pAgg := params[1] - interval, err := gtime.ParseInterval(pInterval) - if err != nil { - return nil, errParsingFunctionParam(err) - } - - aggFunc := getAggFunc(pAgg) - aggregatedSeries := timeseries.AggregateBy(series, interval, aggFunc) - aggregatedSeries.Meta.Name = fmt.Sprintf("aggregateBy(%s, %s)", pInterval, pAgg) - - return []*timeseries.TimeSeriesData{aggregatedSeries}, nil -} - func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { pFactor := params[0] factor, err := strconv.ParseFloat(pFactor, 64) @@ -121,6 +107,27 @@ func applyOffset(series timeseries.TimeSeries, params ...string) (timeseries.Tim return series.Transform(transformFunc), nil } +func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { + pInterval := params[0] + pAgg := params[1] + interval, err := gtime.ParseInterval(pInterval) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + aggFunc := getAggFunc(pAgg) + aggregatedSeries := timeseries.AggregateBy(series, interval, aggFunc) + aggregatedSeries.Meta.Name = fmt.Sprintf("aggregateBy(%s, %s)", pInterval, pAgg) + + return []*timeseries.TimeSeriesData{aggregatedSeries}, nil +} + +func applySumSeries(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) { + sum := timeseries.SumSeries(series) + sum.Meta.Name = "sumSeries()" + return []*timeseries.TimeSeriesData{sum}, nil +} + func getAggFunc(agg string) timeseries.AggFunc { switch agg { case "avg": diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index 576463c..63b130c 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -75,6 +75,13 @@ func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries return groupedSeries } +func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries { + for i, p := range ts { + ts[i] = transformFunc(p) + } + return ts +} + func AggregateBy(series []*TimeSeriesData, interval time.Duration, aggFunc AggFunc) *TimeSeriesData { aggregatedSeries := NewTimeSeries() @@ -93,16 +100,127 @@ func AggregateBy(series []*TimeSeriesData, interval time.Duration, aggFunc AggFu } func (ts TimeSeries) Sort() { - sort.Slice(ts, func(i, j int) bool { - return ts[i].Time.Before(ts[j].Time) - }) + sorted := sort.SliceIsSorted(ts, ts.less()) + if !sorted { + sort.Slice(ts, ts.less()) + } } -func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries { - for i, p := range ts { - ts[i] = transformFunc(p) +// Implements less() function for sorting slice +func (ts TimeSeries) less() func(i, j int) bool { + return func(i, j int) bool { + return ts[i].Time.Before(ts[j].Time) } - return ts +} + +func SumSeries(series []*TimeSeriesData) *TimeSeriesData { + // Build unique set of time stamps from all series + interpolatedTimeStampsMap := make(map[time.Time]time.Time) + for _, s := range series { + for _, p := range s.TS { + interpolatedTimeStampsMap[p.Time] = p.Time + } + } + + // Convert to slice and sort + interpolatedTimeStamps := make([]time.Time, 0) + for _, ts := range interpolatedTimeStampsMap { + interpolatedTimeStamps = append(interpolatedTimeStamps, ts) + } + sort.Slice(interpolatedTimeStamps, func(i, j int) bool { + return interpolatedTimeStamps[i].Before(interpolatedTimeStamps[j]) + }) + + interpolatedSeries := make([]TimeSeries, 0) + + for _, s := range series { + if s.Len() == 0 { + continue + } + + pointsToInterpolate := make([]TimePoint, 0) + + currentPointIndex := 0 + for _, its := range interpolatedTimeStamps { + currentPoint := s.TS[currentPointIndex] + if its.Equal(currentPoint.Time) { + if currentPointIndex < s.Len()-1 { + currentPointIndex++ + } + } else { + pointsToInterpolate = append(pointsToInterpolate, TimePoint{Time: its, Value: nil}) + } + } + + s.TS = append(s.TS, pointsToInterpolate...) + s.TS.Sort() + s.TS = interpolateSeries(s.TS) + interpolatedSeries = append(interpolatedSeries, s.TS) + } + + sumSeries := NewTimeSeriesData() + for i := 0; i < len(interpolatedTimeStamps); i++ { + var sum float64 = 0 + for _, s := range interpolatedSeries { + if s[i].Value != nil { + sum += *s[i].Value + } + } + sumSeries.TS = append(sumSeries.TS, TimePoint{Time: interpolatedTimeStamps[i], Value: &sum}) + } + + return sumSeries +} + +func interpolateSeries(series TimeSeries) TimeSeries { + for i := series.Len() - 1; i >= 0; i-- { + point := series[i] + if point.Value == nil { + left := findNearestLeft(series, i) + right := findNearestRight(series, i) + + if left == nil && right == nil { + continue + } + if left == nil { + left = right + } + if right == nil { + right = left + } + + pointValue := linearInterpolation(point.Time, *left, *right) + point.Value = &pointValue + series[i] = point + } + } + return series +} + +func linearInterpolation(ts time.Time, left, right TimePoint) float64 { + if left.Time.Equal(right.Time) { + return (*left.Value + *right.Value) / 2 + } else { + return *left.Value + (*right.Value-*left.Value)/float64((right.Time.UnixNano()-left.Time.UnixNano()))*float64((ts.UnixNano()-left.Time.UnixNano())) + } +} + +func findNearestRight(series TimeSeries, pointIndex int) *TimePoint { + for i := pointIndex; i < series.Len(); i++ { + if series[i].Value != nil { + return &series[i] + } + } + return nil +} + +func findNearestLeft(series TimeSeries, pointIndex int) *TimePoint { + for i := pointIndex; i > 0; i-- { + if series[i].Value != nil { + return &series[i] + } + } + return nil } // Aligns point's time stamps according to provided interval.