From 31e989d28f52afe96d2093c1882dc9fe8389dad1 Mon Sep 17 00:00:00 2001 From: Alexander Zobnin Date: Fri, 28 May 2021 10:11:41 +0300 Subject: [PATCH] Implement EMA --- pkg/datasource/functions.go | 32 ++++++++++------ pkg/timeseries/moving_average.go | 66 ++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 11 deletions(-) diff --git a/pkg/datasource/functions.go b/pkg/datasource/functions.go index f7518b5..4d77e24 100644 --- a/pkg/datasource/functions.go +++ b/pkg/datasource/functions.go @@ -57,17 +57,18 @@ var frontendFuncMap map[string]bool func init() { seriesFuncMap = map[string]DataProcessingFunc{ - "groupBy": applyGroupBy, - "scale": applyScale, - "offset": applyOffset, - "delta": applyDelta, - "rate": applyRate, - "movingAverage": applyMovingAverage, - "removeAboveValue": applyRemoveAboveValue, - "removeBelowValue": applyRemoveBelowValue, - "transformNull": applyTransformNull, - "percentile": applyPercentile, - "timeShift": applyTimeShiftPost, + "groupBy": applyGroupBy, + "scale": applyScale, + "offset": applyOffset, + "delta": applyDelta, + "rate": applyRate, + "movingAverage": applyMovingAverage, + "exponentialMovingAverage": applyExponentialMovingAverage, + "removeAboveValue": applyRemoveAboveValue, + "removeBelowValue": applyRemoveBelowValue, + "transformNull": applyTransformNull, + "percentile": applyPercentile, + "timeShift": applyTimeShiftPost, } aggFuncMap = map[string]AggDataProcessingFunc{ @@ -256,6 +257,15 @@ func applyMovingAverage(series timeseries.TimeSeries, params ...interface{}) (ti return series.SimpleMovingAverage(n), nil } +func applyExponentialMovingAverage(series timeseries.TimeSeries, params ...interface{}) (timeseries.TimeSeries, error) { + n, err := MustFloat64(params[0]) + if err != nil { + return nil, errParsingFunctionParam(err) + } + + return series.ExponentialMovingAverage(n), nil +} + func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...interface{}) ([]*timeseries.TimeSeriesData, error) { pInterval, err := MustString(params[0]) pAgg, err := MustString(params[1]) diff --git a/pkg/timeseries/moving_average.go b/pkg/timeseries/moving_average.go index eaaec69..6fd1e67 100644 --- a/pkg/timeseries/moving_average.go +++ b/pkg/timeseries/moving_average.go @@ -12,6 +12,9 @@ func (ts TimeSeries) SimpleMovingAverage(n int) TimeSeries { // It's not possible to calculate MA if n greater than number of points n = int(math.Min(float64(ts.Len()), float64(n))) + // It's not a most performant way to caclulate MA, but since it's most straightforward, it's easy to read. + // Should work fine on relatively small n, which is 90% of cases. Another way is caclulate window average, then add + // next point ( (window sum + point value) / (count + 1) ) and remove the first one. for i := n; i < ts.Len(); i++ { windowEdgeRight := i windowCount := 0 @@ -33,3 +36,66 @@ func (ts TimeSeries) SimpleMovingAverage(n int) TimeSeries { return sma } + +func (ts TimeSeries) ExponentialMovingAverage(an float64) TimeSeries { + if ts.Len() == 0 { + return ts + } + + // It's not possible to calculate MA if n greater than number of points + an = math.Min(float64(ts.Len()), an) + + // alpha coefficient should be between 0 and 1. If provided n <= 1, then use it as alpha directly. Otherwise, it's a + // number of points in the window and alpha calculted from this information. + var a float64 + var n int + ema := []TimePoint{ts[0]} + emaPrev := *ts[0].Value + var emaCurrent float64 + + if an > 1 { + // Calculate a from window size + a = 2 / (an + 1) + n = int(an) + + // Initial window, use simple moving average + windowCount := 0 + var windowSum float64 = 0 + for i := n; i > 0; i-- { + point := ts[n-i] + if point.Value != nil { + windowSum += *point.Value + windowCount++ + } + } + if windowCount > 0 { + windowAvg := windowSum / float64(windowCount) + // Actually, we should set timestamp from datapoints[n-1] and start calculation of EMA from n. + // But in order to start EMA from first point (not from Nth) we should expand time range and request N additional + // points outside left side of range. We can't do that, so this trick is used for pretty view of first N points. + // We calculate AVG for first N points, but then start from 2nd point, not from Nth. In general, it means we + // assume that previous N values (0-N, 0-(N-1), ..., 0-1) have the same average value as a first N values. + ema[0] = TimePoint{Time: ts[0].Time, Value: &windowAvg} + emaPrev = windowAvg + n = 1 + } + } else { + // Use predefined a and start from 1st point (use it as initial EMA value) + a = an + n = 1 + } + + for i := n; i < ts.Len(); i++ { + point := ts[i] + if point.Value != nil { + emaCurrent = a*(*point.Value) + (1-a)*emaPrev + emaPrev = emaCurrent + value := emaCurrent + ema = append(ema, TimePoint{Time: point.Time, Value: &value}) + } else { + ema = append(ema, TimePoint{Time: point.Time, Value: nil}) + } + } + + return ema +}