Implement aggregateBy

This commit is contained in:
Alexander Zobnin
2021-05-26 13:04:57 +03:00
parent 1cdeafd9ca
commit 5d3433aef5
3 changed files with 55 additions and 6 deletions

View File

@@ -18,21 +18,25 @@ var errParsingFunctionParam = func(err error) error {
type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) type DataProcessingFunc = func(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error)
type MetaDataProcessingFunc = func(series *timeseries.TimeSeriesData, params ...string) (*timeseries.TimeSeriesData, error) type AggDataProcessingFunc = func(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error)
var funcMap map[string]DataProcessingFunc var seriesFuncMap map[string]DataProcessingFunc
var metaFuncMap map[string]MetaDataProcessingFunc var aggFuncMap map[string]AggDataProcessingFunc
var frontendFuncMap map[string]bool var frontendFuncMap map[string]bool
func init() { func init() {
funcMap = map[string]DataProcessingFunc{ seriesFuncMap = map[string]DataProcessingFunc{
"groupBy": applyGroupBy, "groupBy": applyGroupBy,
"scale": applyScale, "scale": applyScale,
"offset": applyOffset, "offset": applyOffset,
} }
aggFuncMap = map[string]AggDataProcessingFunc{
"aggregateBy": applyAggregateBy,
}
// Functions processing on the frontend // Functions processing on the frontend
frontendFuncMap = map[string]bool{ frontendFuncMap = map[string]bool{
"setAlias": true, "setAlias": true,
@@ -43,7 +47,7 @@ func init() {
func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) { func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFunction) ([]*timeseries.TimeSeriesData, error) {
for _, f := range functions { for _, f := range functions {
if applyFunc, ok := funcMap[f.Def.Name]; ok { if applyFunc, ok := seriesFuncMap[f.Def.Name]; ok {
for _, s := range series { for _, s := range series {
result, err := applyFunc(s.TS, f.Params...) result, err := applyFunc(s.TS, f.Params...)
if err != nil { if err != nil {
@@ -51,6 +55,12 @@ func applyFunctions(series []*timeseries.TimeSeriesData, functions []QueryFuncti
} }
s.TS = result s.TS = result
} }
} else if applyAggFunc, ok := aggFuncMap[f.Def.Name]; ok {
result, err := applyAggFunc(series, f.Params...)
if err != nil {
return nil, err
}
series = result
} else if _, ok := frontendFuncMap[f.Def.Name]; ok { } else if _, ok := frontendFuncMap[f.Def.Name]; ok {
continue continue
} else { } else {
@@ -74,6 +84,21 @@ func applyGroupBy(series timeseries.TimeSeries, params ...string) (timeseries.Ti
return s, nil return s, nil
} }
func applyAggregateBy(series []*timeseries.TimeSeriesData, params ...string) ([]*timeseries.TimeSeriesData, error) {
pInterval := params[0]
pAgg := params[1]
interval, err := gtime.ParseInterval(pInterval)
if err != nil {
return nil, errParsingFunctionParam(err)
}
aggFunc := getAggFunc(pAgg)
aggregatedSeries := timeseries.AggregateBy(series, interval, aggFunc)
aggregatedSeries.Meta.Name = fmt.Sprintf("aggregateBy(%s, %s)", pInterval, pAgg)
return []*timeseries.TimeSeriesData{aggregatedSeries}, nil
}
func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) { func applyScale(series timeseries.TimeSeries, params ...string) (timeseries.TimeSeries, error) {
pFactor := params[0] pFactor := params[0]
factor, err := strconv.ParseFloat(pFactor, 64) factor, err := strconv.ParseFloat(pFactor, 64)

View File

@@ -30,6 +30,7 @@ func (tsd *TimeSeriesData) Add(point TimePoint) *TimeSeriesData {
return tsd return tsd
} }
// GroupBy groups points in given interval by applying provided `aggFunc`. Source time series should be sorted by time.
func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries { func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries {
if ts.Len() == 0 { if ts.Len() == 0 {
return ts return ts
@@ -74,6 +75,29 @@ func (ts TimeSeries) GroupBy(interval time.Duration, aggFunc AggFunc) TimeSeries
return groupedSeries return groupedSeries
} }
func AggregateBy(series []*TimeSeriesData, interval time.Duration, aggFunc AggFunc) *TimeSeriesData {
aggregatedSeries := NewTimeSeries()
// Combine all points into one time series
for _, s := range series {
aggregatedSeries = append(aggregatedSeries, s.TS...)
}
// GroupBy works correctly only with sorted time series
aggregatedSeries.Sort()
aggregatedSeries = aggregatedSeries.GroupBy(interval, aggFunc)
aggregatedSeriesData := NewTimeSeriesData()
aggregatedSeriesData.TS = aggregatedSeries
return aggregatedSeriesData
}
func (ts TimeSeries) Sort() {
sort.Slice(ts, func(i, j int) bool {
return ts[i].Time.Before(ts[j].Time)
})
}
func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries { func (ts TimeSeries) Transform(transformFunc TransformFunc) TimeSeries {
for i, p := range ts { for i, p := range ts {
ts[i] = transformFunc(p) ts[i] = transformFunc(p)

View File

@@ -200,7 +200,7 @@ addFuncDef({
category: 'Aggregate', category: 'Aggregate',
params: [ params: [
{ name: 'interval', type: 'string' }, { name: 'interval', type: 'string' },
{ name: 'function', type: 'string', options: ['avg', 'min', 'max', 'sum', 'count', 'median'] } { name: 'function', type: 'string', options: ['avg', 'min', 'max', 'sum', 'count', 'median', 'first', 'last'] }
], ],
defaultParams: ['1m', 'avg'], defaultParams: ['1m', 'avg'],
}); });