Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fea(sql): add when condition for analytic function #1534

Merged
merged 6 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/en_US/sqls/built-in_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ When casting to datetime type, the supported column type and casting rule are:

Analytic functions always use state to do analytic jobs. In streaming processing, analytic functions are evaluated first so that they are not affected by predicates in WHERE clause.

Analytic function call format is, over is optional

```
AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>] [WHEN <Expression>])
```

Analytic function computations are performed over all the input events of the current query input, optionally you can limit analytic function to only consider events that match the partition_by_clause.

The syntax is like:
Expand All @@ -172,6 +178,15 @@ The syntax is like:
AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>])
```

The analysis function can use the WHEN clause to determine whether the current event is a valid event based on whether the condition is met.
When it is a valid event, calculate the result and update the state according to the analysis function semantics. When it is an invalid event, ignore the event value and reuse the saved state value.

```
AnalyticFuncName(<arguments>...) OVER ([WHEN <Expression>])
```



| Function | Example | Description |
|-------------|--------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| lag | lag(expr, [offset], [default value]) | Return the former result of expression at offset, if not found, return the default value specified , if default value not set, return nil. if offset and default value not specified, offset is 1 and default value is nil |
Expand All @@ -191,6 +206,12 @@ Example function call to get the previous temperature value with the same device
lag(temperature) OVER (PARTITION BY deviceId)
```

Example function call to calculate duration of events: ts is timestamp, and statusCode1 and statusCode2 are device status in the same event

```text
ts - lag(ts, 1, ts) OVER (WHEN statusCode1 != statusCode2)
```

## Other Functions
| Function | Example | Description |
|--------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Expand Down
20 changes: 19 additions & 1 deletion docs/zh_CN/sqls/built-in_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,26 @@ eKuiper 具有许多内置函数,可以对数据执行计算。

分析函数会保持状态来做分析工作。在流式处理规则中,分析函数会首先被执行,这样它们就不会受到 WHERE 子句的影响而必不更新状态。

分析函数的计算是在当前查询输入的所有输入事件上进行的,可以选择限制分析函数只考虑符合 PARTITION BY 子句的事件。
分析函数完整使用格式为, over 参数可选

```
AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>] [WHEN <Expression>])
```

分析函数的计算是在当前查询输入的所有输入事件上进行的,可以选择限制分析函数只考虑符合 PARTITION BY 子句的事件。
分析函数可以使用 PARTITION BY 子句,语法如下:

```
AnalyticFuncName(<arguments>...) OVER ([PARTITION BY <partition key>])
```

分析函数可以使用 WHEN 条件判断子句,根据是否满足条件来确定当前事件是否为有效事件。
当为有效事件时,根据分析函数语意计算结果并更新状态。当为无效事件时,忽略事件值,复用保存的状态值。

```
AnalyticFuncName(<arguments>...) OVER ([WHEN <Expression>])
```

| Function | Example | Description |
|-------------|--------------------------------------|----------------------------------------------------------------------------------------------------|
| lag | lag(expr, [offset], [default value]) | 返回表达式前一个值在偏移 offset 处的结果,如果没有找到,则返回默认值,如果没有指定默认值则返回 nil。如果除 expression 外其余参数均未指定,偏移量默认为 1,默认值为 nil |
Expand All @@ -192,6 +204,12 @@ lag(temperature)
lag(temperature) OVER (PARTITION BY deviceId)
```

示例3:ts为时间戳,获取设备状态 statusCode1 和 statusCode2 不相等持续时间

```text
ts - lag(ts, 1, ts) OVER (WHEN statusCode1 != statusCode2)
```

## 其它函数
| 函数 | 示例 | 说明 |
|--------------|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Expand Down
76 changes: 52 additions & 24 deletions internal/binder/function/funcs_analytic.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func registerAnalyticFunc() {
if ignoreNull && args[1] == nil {
return nil, true
}
validData, ok := args[len(args)-2].(bool)
if !ok {
return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
}
if !validData {
return nil, true
}
key := args[len(args)-1].(string)
lv, err := ctx.GetState(key)
if err != nil {
Expand Down Expand Up @@ -62,17 +69,25 @@ func registerAnalyticFunc() {
builtins["had_changed"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
l := len(args) - 1
l := len(args) - 2
if l <= 1 {
return fmt.Errorf("expect more than one arg but got %d", len(args)), false
}
validData, ok := args[len(args)-2].(bool)
if !ok {
return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
}
if !validData {
return false, true
}
ignoreNull, ok := args[0].(bool)
if !ok {
return fmt.Errorf("first arg is not a bool but got %v", args[0]), false
}
key := args[l].(string)
key := args[len(args)-1].(string)
paraLen := len(args) - 2
result := false
for i := 1; i < l; i++ {
for i := 1; i < paraLen; i++ {
v := args[i]
k := key + strconv.Itoa(i)
if ignoreNull && v == nil {
Expand Down Expand Up @@ -106,23 +121,31 @@ func registerAnalyticFunc() {
builtins["lag"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
l := len(args) - 1
key := args[l].(string)
l := len(args) - 2
if l != 1 && l != 2 && l != 3 {
return fmt.Errorf("expect one two or three args but got %d", l), false
}
key := args[len(args)-1].(string)
ngjaying marked this conversation as resolved.
Show resolved Hide resolved
v, err := ctx.GetState(key)
if err != nil {
return fmt.Errorf("error getting state for %s: %v", key, err), false
}
validData, ok := args[len(args)-2].(bool)
if !ok {
return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
}
paraLen := len(args) - 2
var rq *ringqueue = nil
var rtnVal interface{} = nil

// first time call, need create state for lag
if v == nil {
size := 0
var dftVal interface{} = nil
if l == 3 {
if paraLen == 3 {
dftVal = args[2]
}
// first time call, need create state for lag
if l == 1 {
if paraLen == 1 {
size = 1
} else {
siz, ok := args[1].(int)
Expand All @@ -131,30 +154,27 @@ func registerAnalyticFunc() {
}
size = siz
}

rq := newRingqueue(size)
rq = newRingqueue(size)
rq.fill(dftVal)

rtnVal, _ := rq.fetch()
rq.append(args[0])
err := ctx.PutState(key, rq)
if err != nil {
return fmt.Errorf("error setting state for %s: %v", key, err), false
}
return rtnVal, true
} else {
rq, ok := v.(*ringqueue)
if !ok {
return fmt.Errorf("error getting state for %s: %v", key, err), false
}
rtnVal, _ := rq.fetch()
rq, _ = v.(*ringqueue)
}

if validData {
rtnVal, _ = rq.fetch()
rq.append(args[0])
err := ctx.PutState(key, rq)
if err != nil {
return fmt.Errorf("error setting state for %s: %v", key, err), false
}
return rtnVal, true
} else {
rtnVal, _ = rq.peek()
}
return rtnVal, true
},
val: func(_ api.FunctionContext, args []ast.Expr) error {
l := len(args)
Expand All @@ -178,18 +198,24 @@ func registerAnalyticFunc() {
builtins["latest"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
l := len(args) - 1
key := args[l].(string)
l := len(args) - 2
if l != 1 && l != 2 {
return fmt.Errorf("expect one or two args but got %d", l), false
}
paraLen := len(args) - 2
key := args[len(args)-1].(string)
ngjaying marked this conversation as resolved.
Show resolved Hide resolved
validData, ok := args[len(args)-2].(bool)
if !ok {
return fmt.Errorf("when arg is not a bool but got %v", args[len(args)-2]), false
}

if args[0] == nil {
v, err := ctx.GetState(key)
if err != nil {
return fmt.Errorf("error getting state for %s: %v", key, err), false
}
if v == nil {
if l == 2 {
if paraLen == 2 {
return args[1], true
} else {
return nil, true
Expand All @@ -198,7 +224,9 @@ func registerAnalyticFunc() {
return v, true
}
} else {
ctx.PutState(key, args[0])
if validData {
ctx.PutState(key, args[0])
}
return args[0], true
}
},
Expand Down
Loading