Skip to content

Commit

Permalink
fix: fix influx sink validate (lf-edge#2893)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
Yisaer authored and ngjaying committed Jul 24, 2024
1 parent 0f1b43e commit 42f6739
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
2 changes: 1 addition & 1 deletion extensions/impl/influx2/influx2.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *influxSink2) Provision(_ api.StreamContext, props map[string]any) error
default:
return fmt.Errorf("precision %s is not supported", m.conf.PrecisionStr)
}
if len(m.conf.Measurement) == 0 {
if len(m.conf.Measurement) == 0 && !m.conf.UseLineProtocol {
return fmt.Errorf("measurement is required")
}
err = cast.MapToStruct(props, &m.conf.WriteOptions)
Expand Down
51 changes: 41 additions & 10 deletions extensions/impl/influx2/influx2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package influx2

import (
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -104,13 +105,36 @@ func TestConfig(t *testing.T) {
},
error: "precision abc is not supported",
},
{
name: "no error",
conf: map[string]interface{}{
"addr": "http://192.168.0.3:8086",
"org": "abc",
"bucket": "bucket_one",
"precision": "ns",
"useLineProtocol": true,
},
expected: c{
Addr: "http://192.168.0.3:8086",
Org: "abc",
Bucket: "bucket_one",
PrecisionStr: "ns",
Precision: time.Nanosecond,
UseLineProtocol: true,
WriteOptions: tspoint.WriteOptions{
PrecisionStr: "ns",
},
BatchSize: 1,
},
},
{
name: "measurement missing error",
conf: map[string]interface{}{
"addr": "http://192.168.0.3:8086",
"org": "abc",
"bucket": "bucket_one",
"precision": "ns",
"addr": "http://192.168.0.3:8086",
"org": "abc",
"bucket": "bucket_one",
"precision": "ns",
"useLineProtocol": false,
},
error: "measurement is required",
},
Expand Down Expand Up @@ -428,10 +452,11 @@ func TestCollectPointsError(t *testing.T) {
func TestCollectLines(t *testing.T) {
timex.Set(10)
tests := []struct {
name string
conf c
data any
result []string
name string
conf c
data any
result []string
result2 []string
}{
{
name: "normal",
Expand Down Expand Up @@ -507,7 +532,8 @@ func TestCollectLines(t *testing.T) {
"humidity": 50,
"ts": 100,
},
result: []string{"test5,tag2=50 humidity=50,ts=100 100"},
result: []string{"test5,tag2=50 humidity=50,ts=100 100"},
result2: []string{"test5,tag2=50 ts=100,humidity=50 100"},
},
}

Expand All @@ -519,7 +545,12 @@ func TestCollectLines(t *testing.T) {
ctx := mockContext.NewMockContext(test.name, "op")
lines, err := ifsink.transformLines(ctx, test.data)
assert.NoError(t, err)
assert.Equal(t, test.result, lines)
if test.result2 == nil {
assert.Equal(t, test.result, lines)
} else {
assert.True(t, reflect.DeepEqual(test.result, lines) || reflect.DeepEqual(test.result2, lines))
}

})
}
}
Expand Down

0 comments on commit 42f6739

Please sign in to comment.