Skip to content

Commit

Permalink
feat(module): modularize io
Browse files Browse the repository at this point in the history
Support source/sink module

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Mar 21, 2024
1 parent 30bf65f commit 4f7e2c8
Show file tree
Hide file tree
Showing 35 changed files with 211 additions and 150 deletions.
48 changes: 4 additions & 44 deletions cmd/kuiperd/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,54 +14,14 @@

package main

import (
"flag"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/server"
)
import "github.com/lf-edge/ekuiper/cmd"

// The compile time variable
var (
Version = "unknown"
LoadFileType = "relative"
)

var (
loadFileType string
etcPath string
dataPath string
logPath string
pluginsPath string
)

func init() {
flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path")
flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir")
flag.StringVar(&dataPath, "data", "", "data indicates the path of data dir")
flag.StringVar(&logPath, "log", "", "log indicates the path of log dir")
flag.StringVar(&pluginsPath, "plugins", "", "plugins indicates the path of plugins dir")

flag.Parse()

if len(loadFileType) > 0 {
conf.PathConfig.LoadFileType = loadFileType
} else {
conf.PathConfig.LoadFileType = LoadFileType
}
if len(etcPath) > 0 {
conf.PathConfig.Dirs["etc"] = etcPath
}
if len(dataPath) > 0 {
conf.PathConfig.Dirs["data"] = dataPath
}
if len(logPath) > 0 {
conf.PathConfig.Dirs["log"] = logPath
}
if len(pluginsPath) > 0 {
conf.PathConfig.Dirs["plugins"] = pluginsPath
}
}

func main() {
server.StartUp(Version)
cmd.Main(Version, LoadFileType)
}
62 changes: 62 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"flag"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/server"
)

var (
loadFileType string
etcPath string
dataPath string
logPath string
pluginsPath string
)

func init() {
flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path")
flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir")
flag.StringVar(&dataPath, "data", "", "data indicates the path of data dir")
flag.StringVar(&logPath, "log", "", "log indicates the path of log dir")
flag.StringVar(&pluginsPath, "plugins", "", "plugins indicates the path of plugins dir")

flag.Parse()

if len(etcPath) > 0 {
conf.PathConfig.Dirs["etc"] = etcPath
}
if len(dataPath) > 0 {
conf.PathConfig.Dirs["data"] = dataPath
}
if len(logPath) > 0 {
conf.PathConfig.Dirs["log"] = logPath
}
if len(pluginsPath) > 0 {
conf.PathConfig.Dirs["plugins"] = pluginsPath
}
}

func Main(version string, lft string) {
if len(loadFileType) > 0 {
conf.PathConfig.LoadFileType = loadFileType
} else {
conf.PathConfig.LoadFileType = lft
}
server.StartUp(version)
}
65 changes: 28 additions & 37 deletions internal/binder/io/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,75 +25,66 @@ import (
"github.com/lf-edge/ekuiper/internal/io/websocket"
plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

type (
NewSourceFunc func() api.Source
NewLookupSourceFunc func() api.LookupSource
NewSinkFunc func() api.Sink
)
func init() {
modules.RegisterSource("mqtt", func() api.Source { return &mqtt.SourceConnector{} })
modules.RegisterSource("httppull", func() api.Source { return &http.PullSource{} })
modules.RegisterSource("httppush", func() api.Source { return &http.PushSource{} })
modules.RegisterSource("file", func() api.Source { return &file.FileSource{} })
modules.RegisterSource("memory", func() api.Source { return memory.GetSource() })
modules.RegisterSource("neuron", func() api.Source { return neuron.GetSource() })
modules.RegisterSource("websocket", func() api.Source { return &websocket.WebsocketSource{} })
modules.RegisterSource("simulator", func() api.Source { return &simulator.Source{} })

var (
sources = map[string]NewSourceFunc{
"mqtt": func() api.Source { return &mqtt.SourceConnector{} },
"httppull": func() api.Source { return &http.PullSource{} },
"httppush": func() api.Source { return &http.PushSource{} },
"file": func() api.Source { return &file.FileSource{} },
"memory": func() api.Source { return memory.GetSource() },
"neuron": func() api.Source { return neuron.GetSource() },
"websocket": func() api.Source { return &websocket.WebsocketSource{} },
"simulator": func() api.Source { return &simulator.Source{} },
}
sinks = map[string]NewSinkFunc{
"log": sink.NewLogSink,
"logToMemory": sink.NewLogSinkToMemory,
"mqtt": func() api.Sink { return &mqtt.MQTTSink{} },
"rest": func() api.Sink { return &http.RestSink{} },
"nop": func() api.Sink { return &sink.NopSink{} },
"memory": func() api.Sink { return memory.GetSink() },
"neuron": func() api.Sink { return neuron.GetSink() },
"file": func() api.Sink { return file.File() },
"websocket": func() api.Sink { return &websocket.WebSocketSink{} },
}
lookupSources = map[string]NewLookupSourceFunc{
"memory": func() api.LookupSource { return memory.GetLookupSource() },
"httppull": func() api.LookupSource { return http.GetLookUpSource() },
}
)
modules.RegisterSink("log", sink.NewLogSink)
modules.RegisterSink("logToMemory", sink.NewLogSinkToMemory)
modules.RegisterSink("mqtt", func() api.Sink { return &mqtt.MQTTSink{} })
modules.RegisterSink("rest", func() api.Sink { return &http.RestSink{} })
modules.RegisterSink("nop", func() api.Sink { return &sink.NopSink{} })
modules.RegisterSink("memory", func() api.Sink { return memory.GetSink() })
modules.RegisterSink("neuron", func() api.Sink { return neuron.GetSink() })
modules.RegisterSink("file", func() api.Sink { return file.File() })
modules.RegisterSink("websocket", func() api.Sink { return &websocket.WebSocketSink{} })

modules.RegisterLookupSource("memory", func() api.LookupSource { return memory.GetLookupSource() })
modules.RegisterLookupSource("httppull", func() api.LookupSource { return http.GetLookUpSource() })
}

type Manager struct{}

func (m *Manager) Source(name string) (api.Source, error) {
if s, ok := sources[name]; ok {
if s, ok := modules.Sources[name]; ok {
return s(), nil
}
return nil, nil
}

func (m *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
if _, ok := sources[name]; ok {
if _, ok := modules.Sources[name]; ok {

Check warning on line 65 in internal/binder/io/builtin.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/io/builtin.go#L65

Added line #L65 was not covered by tests
return plugin2.INTERNAL, "", ""
} else {
return plugin2.NONE_EXTENSION, "", ""
}
}

func (m *Manager) LookupSource(name string) (api.LookupSource, error) {
if s, ok := lookupSources[name]; ok {
if s, ok := modules.LookupSources[name]; ok {
return s(), nil
}
return nil, nil
}

func (m *Manager) Sink(name string) (api.Sink, error) {
if s, ok := sinks[name]; ok {
if s, ok := modules.Sinks[name]; ok {
return s(), nil
}
return nil, nil
}

func (m *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
if _, ok := sinks[name]; ok {
if _, ok := modules.Sinks[name]; ok {

Check warning on line 87 in internal/binder/io/builtin.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/io/builtin.go#L87

Added line #L87 was not covered by tests
return plugin2.INTERNAL, "", ""
} else {
return plugin2.NONE_EXTENSION, "", ""
Expand Down
8 changes: 5 additions & 3 deletions internal/binder/io/builtin_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,13 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/pkg/modules"
)

func TestLookupSources(t *testing.T) {
_, ok := lookupSources["memory"]
_, ok := modules.LookupSources["memory"]
require.True(t, ok)
_, ok = lookupSources["httppull"]
_, ok = modules.LookupSources["httppull"]
require.True(t, ok)
}
7 changes: 4 additions & 3 deletions internal/binder/io/ext_edgex.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,10 @@ package io
import (
"github.com/lf-edge/ekuiper/internal/io/edgex"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
sources["edgex"] = func() api.Source { return &edgex.EdgexSource{} }
sinks["edgex"] = func() api.Sink { return &edgex.EdgexMsgBusSink{} }
modules.RegisterSource("edgex", func() api.Source { return &edgex.EdgexSource{} })
modules.RegisterSink("edgex", func() api.Sink { return &edgex.EdgexMsgBusSink{} })
}
23 changes: 12 additions & 11 deletions internal/binder/io/ext_full.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,19 +27,20 @@ import (
sql "github.com/lf-edge/ekuiper/extensions/sources/sql/ext"
video "github.com/lf-edge/ekuiper/extensions/sources/video/ext"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
sources["random"] = func() api.Source { return random.GetSource() }
sources["video"] = func() api.Source { return video.GetSource() }
sources["sql"] = func() api.Source { return sql.GetSource() }
sources["kafka"] = func() api.Source { return kafkaSrc.GetSource() }
lookupSources["sql"] = func() api.LookupSource { return sql.GetLookup() }
sinks["image"] = func() api.Sink { return image.GetSink() }
sinks["influx"] = func() api.Sink { return influx.GetSink() }
sinks["influx2"] = func() api.Sink { return influx2.GetSink() }
sinks["kafka"] = func() api.Sink { return kafka.GetSink() }
sinks["sql"] = func() api.Sink { return sqlSink.GetSink() }
modules.RegisterSource("random", func() api.Source { return random.GetSource() })
modules.RegisterSource("video", func() api.Source { return video.GetSource() })
modules.RegisterSource("sql", func() api.Source { return sql.GetSource() })
modules.RegisterSource("kafka", func() api.Source { return kafkaSrc.GetSource() })
modules.RegisterLookupSource("sql", func() api.LookupSource { return sql.GetLookup() })
modules.RegisterSink("image", func() api.Sink { return image.GetSink() })
modules.RegisterSink("influx", func() api.Sink { return influx.GetSink() })
modules.RegisterSink("influx2", func() api.Sink { return influx2.GetSink() })
modules.RegisterSink("kafka", func() api.Sink { return kafka.GetSink() })
modules.RegisterSink("sql", func() api.Sink { return sqlSink.GetSink() })
// Do not include zmq/tdengine because it is not supported for all versions
// sinks["tdengine"] = func() api.Sink { return tdengine.GetSink() }
// sinks["zmq"] = func() api.Sink { return zmqSink.GetSink() }
Expand Down
3 changes: 2 additions & 1 deletion internal/binder/io/ext_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package io
import (
"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
sources["mock"] = func() api.Source { return &mocknode.MockSource{} }
modules.RegisterSource("mock", func() api.Source { return &mocknode.MockSource{} })
}
12 changes: 6 additions & 6 deletions internal/binder/io/ext_redis.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,12 +19,12 @@ package io
import (
"github.com/lf-edge/ekuiper/internal/io/redis"
"github.com/lf-edge/ekuiper/internal/io/redis/pubsub"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
lookupSources["redis"] = func() api.LookupSource { return redis.GetLookupSource() }
sinks["redis"] = func() api.Sink { return redis.GetSink() }
sinks["redisPub"] = func() api.Sink { return pubsub.RedisPub() }
sources["redisSub"] = func() api.Source { return pubsub.RedisSub() }
modules.RegisterLookupSource("redis", redis.GetLookupSource)
modules.RegisterSink("redis", redis.GetSink)
modules.RegisterSink("redisPub", pubsub.RedisPub)
modules.RegisterSource("redisSub", pubsub.RedisSub)
}
4 changes: 2 additions & 2 deletions internal/io/file/file_source_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,9 +26,9 @@ import (
"github.com/stretchr/testify/assert"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/io/mock"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/mock"
)

func TestJsonFile(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/io/file/file_stream_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,11 +24,11 @@ import (

"github.com/lf-edge/ekuiper/internal/compressor"
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/io/mock"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/internal/topo/transform"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/message"
"github.com/lf-edge/ekuiper/pkg/mock"
)

func TestFileSinkCompress_Collect(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/io/http/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,11 @@ import (
"time"

"github.com/lf-edge/ekuiper/internal/conf"
mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context"
"github.com/lf-edge/ekuiper/internal/pkg/cert"
"github.com/lf-edge/ekuiper/internal/pkg/httpx"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
mockContext "github.com/lf-edge/ekuiper/pkg/mock/context"
)

// ClientConf is the configuration for http client
Expand Down
Loading

0 comments on commit 4f7e2c8

Please sign in to comment.