diff --git a/Makefile b/Makefile index 66bef9303d..d810a800e4 100644 --- a/Makefile +++ b/Makefile @@ -42,8 +42,8 @@ build_prepare: .PHONY: build_without_edgex build_without_edgex: build_prepare - GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi @mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @echo "Build successfully" @@ -54,16 +54,16 @@ pkg_without_edgex: build_without_edgex .PHONY: build_with_edgex build_with_edgex: build_prepare - GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi @mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @echo "Build successfully" .PHONY: build_with_edgex_and_script build_with_edgex_and_script: build_prepare - GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging script" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging script" -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi @mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @echo "Build successfully" @@ -74,8 +74,8 @@ pkg_with_edgex: build_with_edgex .PHONY: build_with_fdb build_with_fdb: build_prepare - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "fdb" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "fdb" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "fdb" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "fdb" -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi @mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @echo "Build successfully" @@ -86,7 +86,7 @@ pkg_with_fdb: build_with_fdb .PHONY: build_core build_core: build_prepare - GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags core -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags core -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiperd; fi @mv ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @echo "Build successfully" @@ -105,8 +105,8 @@ PLUGINS_IN_FULL := \ .PHONY: build_full build_full: SHELL:=/bin/bash -euo pipefail build_full: build_prepare - GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "full include_nats_messaging" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "full include_nats_messaging" -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi @mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @while read plugin; do \ @@ -143,8 +143,8 @@ real_pkg: .PHONY: build_with_wasm build_with_wasm: build_prepare - GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "wasmedge" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "wasmedge" -o kuiperd cmd/kuiperd/main.go @if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi @mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin @echo "Build successfully" diff --git a/cmd/kuiperd/main.go b/cmd/kuiperd/main.go index 7ac3b361c9..ed07e70ebd 100644 --- a/cmd/kuiperd/main.go +++ b/cmd/kuiperd/main.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,54 +14,8 @@ package main -import ( - "flag" - - "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/server" -) - -var ( - Version = "unknown" - LoadFileType = "relative" -) - -var ( - loadFileType string - etcPath string - dataPath string - logPath string - pluginsPath string -) - -func init() { - flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path") - flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir") - flag.StringVar(&dataPath, "data", "", "data indicates the path of data dir") - flag.StringVar(&logPath, "log", "", "log indicates the path of log dir") - flag.StringVar(&pluginsPath, "plugins", "", "plugins indicates the path of plugins dir") - - flag.Parse() - - if len(loadFileType) > 0 { - conf.PathConfig.LoadFileType = loadFileType - } else { - conf.PathConfig.LoadFileType = LoadFileType - } - if len(etcPath) > 0 { - conf.PathConfig.Dirs["etc"] = etcPath - } - if len(dataPath) > 0 { - conf.PathConfig.Dirs["data"] = dataPath - } - if len(logPath) > 0 { - conf.PathConfig.Dirs["log"] = logPath - } - if len(pluginsPath) > 0 { - conf.PathConfig.Dirs["plugins"] = pluginsPath - } -} +import "github.com/lf-edge/ekuiper/cmd" func main() { - server.StartUp(Version) + cmd.Main() } diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000000..4e4e52bba2 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,68 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "flag" + + "github.com/lf-edge/ekuiper/internal/conf" + "github.com/lf-edge/ekuiper/internal/server" +) + +// The compile time variable +var ( + Version = "unknown" + LoadFileType = "relative" +) + +var ( + loadFileType string + etcPath string + dataPath string + logPath string + pluginsPath string +) + +func init() { + flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path") + flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir") + flag.StringVar(&dataPath, "data", "", "data indicates the path of data dir") + flag.StringVar(&logPath, "log", "", "log indicates the path of log dir") + flag.StringVar(&pluginsPath, "plugins", "", "plugins indicates the path of plugins dir") + + flag.Parse() + + if len(loadFileType) > 0 { + conf.PathConfig.LoadFileType = loadFileType + } else { + conf.PathConfig.LoadFileType = LoadFileType + } + if len(etcPath) > 0 { + conf.PathConfig.Dirs["etc"] = etcPath + } + if len(dataPath) > 0 { + conf.PathConfig.Dirs["data"] = dataPath + } + if len(logPath) > 0 { + conf.PathConfig.Dirs["log"] = logPath + } + if len(pluginsPath) > 0 { + conf.PathConfig.Dirs["plugins"] = pluginsPath + } +} + +func Main() { + server.StartUp(Version) +} diff --git a/deploy/packages/deb/debian/rules b/deploy/packages/deb/debian/rules index 8cbc44ad64..1c8db1b521 100755 --- a/deploy/packages/deb/debian/rules +++ b/deploy/packages/deb/debian/rules @@ -16,8 +16,8 @@ PKG_VSN ?= develop ## note that it is necessary to use overlay_vars relative to .. as ## the generate command EXECUTES in rel/ build: - GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(PKG_VSN) -X main.LoadFileType=absolute" -o kuiper cmd/kuiper/main.go - GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(PKG_VSN) -X main.LoadFileType=absolute" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(PKG_VSN) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o kuiper cmd/kuiper/main.go + GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(PKG_VSN) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o kuiperd cmd/kuiperd/main.go clean: dh_clean diff --git a/deploy/packages/rpm/kuiper.spec b/deploy/packages/rpm/kuiper.spec index f23d0e429d..e6032139b4 100644 --- a/deploy/packages/rpm/kuiper.spec +++ b/deploy/packages/rpm/kuiper.spec @@ -25,8 +25,8 @@ A lightweight IoT edge analytics software %build cd %{_code_source} -GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=%{_version}-%{_release} -X main.LoadFileType=absolute" -o %{_code_source}/kuiper %{_code_source}/cmd/kuiper/main.go -GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=%{_version}-%{_release} -X main.LoadFileType=absolute" -o %{_code_source}/kuiperd %{_code_source}/cmd/kuiperd/main.go +GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=%{_version}-%{_release} -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o %{_code_source}/kuiper %{_code_source}/cmd/kuiper/main.go +GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=%{_version}-%{_release} -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o %{_code_source}/kuiperd %{_code_source}/cmd/kuiperd/main.go cd - %install diff --git a/docs/en_US/installation.md b/docs/en_US/installation.md index 0254104d15..c1c6261dfb 100644 --- a/docs/en_US/installation.md +++ b/docs/en_US/installation.md @@ -252,7 +252,7 @@ For example, to cross build ARM64 binaries in AMD64 ubuntu/debian machine, do th 2. Update the Makefile in the build command. Examples: ```shell - GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go ``` 3. Run `make` diff --git a/docs/zh_CN/installation.md b/docs/zh_CN/installation.md index 7cdfd77c02..82c92453bc 100644 --- a/docs/zh_CN/installation.md +++ b/docs/zh_CN/installation.md @@ -252,7 +252,7 @@ sqlite,因此 `CGO_ENABLE` 必须设置为1。在交叉编译时,必须安 2. 更新 Makefile 里的编译相关参数如下: ```shell - GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go + GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go ``` 3. 运行 `make` 。 diff --git a/internal/binder/io/builtin.go b/internal/binder/io/builtin.go index 3d09faad43..a1c778d8b3 100644 --- a/internal/binder/io/builtin.go +++ b/internal/binder/io/builtin.go @@ -25,53 +25,44 @@ import ( "github.com/lf-edge/ekuiper/internal/io/websocket" plugin2 "github.com/lf-edge/ekuiper/internal/plugin" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/modules" ) -type ( - NewSourceFunc func() api.Source - NewLookupSourceFunc func() api.LookupSource - NewSinkFunc func() api.Sink -) +func init() { + modules.RegisterSource("mqtt", func() api.Source { return &mqtt.SourceConnector{} }) + modules.RegisterSource("httppull", func() api.Source { return &http.PullSource{} }) + modules.RegisterSource("httppush", func() api.Source { return &http.PushSource{} }) + modules.RegisterSource("file", func() api.Source { return &file.FileSource{} }) + modules.RegisterSource("memory", func() api.Source { return memory.GetSource() }) + modules.RegisterSource("neuron", func() api.Source { return neuron.GetSource() }) + modules.RegisterSource("websocket", func() api.Source { return &websocket.WebsocketSource{} }) + modules.RegisterSource("simulator", func() api.Source { return &simulator.Source{} }) -var ( - sources = map[string]NewSourceFunc{ - "mqtt": func() api.Source { return &mqtt.SourceConnector{} }, - "httppull": func() api.Source { return &http.PullSource{} }, - "httppush": func() api.Source { return &http.PushSource{} }, - "file": func() api.Source { return &file.FileSource{} }, - "memory": func() api.Source { return memory.GetSource() }, - "neuron": func() api.Source { return neuron.GetSource() }, - "websocket": func() api.Source { return &websocket.WebsocketSource{} }, - "simulator": func() api.Source { return &simulator.Source{} }, - } - sinks = map[string]NewSinkFunc{ - "log": sink.NewLogSink, - "logToMemory": sink.NewLogSinkToMemory, - "mqtt": func() api.Sink { return &mqtt.MQTTSink{} }, - "rest": func() api.Sink { return &http.RestSink{} }, - "nop": func() api.Sink { return &sink.NopSink{} }, - "memory": func() api.Sink { return memory.GetSink() }, - "neuron": func() api.Sink { return neuron.GetSink() }, - "file": func() api.Sink { return file.File() }, - "websocket": func() api.Sink { return &websocket.WebSocketSink{} }, - } - lookupSources = map[string]NewLookupSourceFunc{ - "memory": func() api.LookupSource { return memory.GetLookupSource() }, - "httppull": func() api.LookupSource { return http.GetLookUpSource() }, - } -) + modules.RegisterSink("log", sink.NewLogSink) + modules.RegisterSink("logToMemory", sink.NewLogSinkToMemory) + modules.RegisterSink("mqtt", func() api.Sink { return &mqtt.MQTTSink{} }) + modules.RegisterSink("rest", func() api.Sink { return &http.RestSink{} }) + modules.RegisterSink("nop", func() api.Sink { return &sink.NopSink{} }) + modules.RegisterSink("memory", func() api.Sink { return memory.GetSink() }) + modules.RegisterSink("neuron", func() api.Sink { return neuron.GetSink() }) + modules.RegisterSink("file", func() api.Sink { return file.File() }) + modules.RegisterSink("websocket", func() api.Sink { return &websocket.WebSocketSink{} }) + + modules.RegisterLookupSource("memory", func() api.LookupSource { return memory.GetLookupSource() }) + modules.RegisterLookupSource("httppull", func() api.LookupSource { return http.GetLookUpSource() }) +} type Manager struct{} func (m *Manager) Source(name string) (api.Source, error) { - if s, ok := sources[name]; ok { + if s, ok := modules.Sources[name]; ok { return s(), nil } return nil, nil } func (m *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) { - if _, ok := sources[name]; ok { + if _, ok := modules.Sources[name]; ok { return plugin2.INTERNAL, "", "" } else { return plugin2.NONE_EXTENSION, "", "" @@ -79,21 +70,21 @@ func (m *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, } func (m *Manager) LookupSource(name string) (api.LookupSource, error) { - if s, ok := lookupSources[name]; ok { + if s, ok := modules.LookupSources[name]; ok { return s(), nil } return nil, nil } func (m *Manager) Sink(name string) (api.Sink, error) { - if s, ok := sinks[name]; ok { + if s, ok := modules.Sinks[name]; ok { return s(), nil } return nil, nil } func (m *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) { - if _, ok := sinks[name]; ok { + if _, ok := modules.Sinks[name]; ok { return plugin2.INTERNAL, "", "" } else { return plugin2.NONE_EXTENSION, "", "" diff --git a/internal/binder/io/builtin_test.go b/internal/binder/io/builtin_test.go index 59dc78982f..dbc4aabb67 100644 --- a/internal/binder/io/builtin_test.go +++ b/internal/binder/io/builtin_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,11 +18,13 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/lf-edge/ekuiper/pkg/modules" ) func TestLookupSources(t *testing.T) { - _, ok := lookupSources["memory"] + _, ok := modules.LookupSources["memory"] require.True(t, ok) - _, ok = lookupSources["httppull"] + _, ok = modules.LookupSources["httppull"] require.True(t, ok) } diff --git a/internal/binder/io/ext_edgex.go b/internal/binder/io/ext_edgex.go index 731c4acf16..da764eeb99 100644 --- a/internal/binder/io/ext_edgex.go +++ b/internal/binder/io/ext_edgex.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,9 +19,10 @@ package io import ( "github.com/lf-edge/ekuiper/internal/io/edgex" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/modules" ) func init() { - sources["edgex"] = func() api.Source { return &edgex.EdgexSource{} } - sinks["edgex"] = func() api.Sink { return &edgex.EdgexMsgBusSink{} } + modules.RegisterSource("edgex", func() api.Source { return &edgex.EdgexSource{} }) + modules.RegisterSink("edgex", func() api.Sink { return &edgex.EdgexMsgBusSink{} }) } diff --git a/internal/binder/io/ext_full.go b/internal/binder/io/ext_full.go index 917023b7ac..7adef60d7d 100644 --- a/internal/binder/io/ext_full.go +++ b/internal/binder/io/ext_full.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,19 +27,20 @@ import ( sql "github.com/lf-edge/ekuiper/extensions/sources/sql/ext" video "github.com/lf-edge/ekuiper/extensions/sources/video/ext" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/modules" ) func init() { - sources["random"] = func() api.Source { return random.GetSource() } - sources["video"] = func() api.Source { return video.GetSource() } - sources["sql"] = func() api.Source { return sql.GetSource() } - sources["kafka"] = func() api.Source { return kafkaSrc.GetSource() } - lookupSources["sql"] = func() api.LookupSource { return sql.GetLookup() } - sinks["image"] = func() api.Sink { return image.GetSink() } - sinks["influx"] = func() api.Sink { return influx.GetSink() } - sinks["influx2"] = func() api.Sink { return influx2.GetSink() } - sinks["kafka"] = func() api.Sink { return kafka.GetSink() } - sinks["sql"] = func() api.Sink { return sqlSink.GetSink() } + modules.RegisterSource("random", func() api.Source { return random.GetSource() }) + modules.RegisterSource("video", func() api.Source { return video.GetSource() }) + modules.RegisterSource("sql", func() api.Source { return sql.GetSource() }) + modules.RegisterSource("kafka", func() api.Source { return kafkaSrc.GetSource() }) + modules.RegisterLookupSource("sql", func() api.LookupSource { return sql.GetLookup() }) + modules.RegisterSink("image", func() api.Sink { return image.GetSink() }) + modules.RegisterSink("influx", func() api.Sink { return influx.GetSink() }) + modules.RegisterSink("influx2", func() api.Sink { return influx2.GetSink() }) + modules.RegisterSink("kafka", func() api.Sink { return kafka.GetSink() }) + modules.RegisterSink("sql", func() api.Sink { return sqlSink.GetSink() }) // Do not include zmq/tdengine because it is not supported for all versions // sinks["tdengine"] = func() api.Sink { return tdengine.GetSink() } // sinks["zmq"] = func() api.Sink { return zmqSink.GetSink() } diff --git a/internal/binder/io/ext_mock.go b/internal/binder/io/ext_mock.go index b35f20cd23..6727e01473 100644 --- a/internal/binder/io/ext_mock.go +++ b/internal/binder/io/ext_mock.go @@ -19,8 +19,9 @@ package io import ( "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/modules" ) func init() { - sources["mock"] = func() api.Source { return &mocknode.MockSource{} } + modules.RegisterSource("mock", func() api.Source { return &mocknode.MockSource{} }) } diff --git a/internal/binder/io/ext_redis.go b/internal/binder/io/ext_redis.go index a217eeb648..430d39b498 100644 --- a/internal/binder/io/ext_redis.go +++ b/internal/binder/io/ext_redis.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,12 +19,12 @@ package io import ( "github.com/lf-edge/ekuiper/internal/io/redis" "github.com/lf-edge/ekuiper/internal/io/redis/pubsub" - "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/modules" ) func init() { - lookupSources["redis"] = func() api.LookupSource { return redis.GetLookupSource() } - sinks["redis"] = func() api.Sink { return redis.GetSink() } - sinks["redisPub"] = func() api.Sink { return pubsub.RedisPub() } - sources["redisSub"] = func() api.Source { return pubsub.RedisSub() } + modules.RegisterLookupSource("redis", redis.GetLookupSource) + modules.RegisterSink("redis", redis.GetSink) + modules.RegisterSink("redisPub", pubsub.RedisPub) + modules.RegisterSource("redisSub", pubsub.RedisSub) } diff --git a/internal/io/file/file_source_test.go b/internal/io/file/file_source_test.go index 82644b067c..69eff8790f 100644 --- a/internal/io/file/file_source_test.go +++ b/internal/io/file/file_source_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,9 +26,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" ) func TestJsonFile(t *testing.T) { diff --git a/internal/io/file/file_stream_test.go b/internal/io/file/file_stream_test.go index 04998b6d54..dcced5de3b 100644 --- a/internal/io/file/file_stream_test.go +++ b/internal/io/file/file_stream_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,11 +24,11 @@ import ( "github.com/lf-edge/ekuiper/internal/compressor" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/internal/topo/context" "github.com/lf-edge/ekuiper/internal/topo/transform" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/message" + "github.com/lf-edge/ekuiper/pkg/mock" ) func TestFileSinkCompress_Collect(t *testing.T) { diff --git a/internal/io/http/client.go b/internal/io/http/client.go index ca66a914da..8e5bda6eaa 100644 --- a/internal/io/http/client.go +++ b/internal/io/http/client.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,11 +25,11 @@ import ( "time" "github.com/lf-edge/ekuiper/internal/conf" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/pkg/cert" "github.com/lf-edge/ekuiper/internal/pkg/httpx" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/cast" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) // ClientConf is the configuration for http client diff --git a/internal/io/http/client_test.go b/internal/io/http/client_test.go index cd0ce131f4..e6abe48099 100644 --- a/internal/io/http/client_test.go +++ b/internal/io/http/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ import ( "reflect" "testing" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestHeaderConf(t *testing.T) { diff --git a/internal/io/http/httppull_source_test.go b/internal/io/http/httppull_source_test.go index 51debaebd1..f1d4b66ed3 100644 --- a/internal/io/http/httppull_source_test.go +++ b/internal/io/http/httppull_source_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,10 +32,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/internal/topo/topotest/mockclock" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" ) func jsonOut(w http.ResponseWriter, out interface{}) { diff --git a/internal/io/mqtt/connection_test.go b/internal/io/mqtt/connection_test.go index a64a435cff..932c9476b6 100644 --- a/internal/io/mqtt/connection_test.go +++ b/internal/io/mqtt/connection_test.go @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestConnectionLC(t *testing.T) { diff --git a/internal/io/mqtt/mqtt_source_connector_test.go b/internal/io/mqtt/mqtt_source_connector_test.go index 02440515a5..b0e73ecbbb 100644 --- a/internal/io/mqtt/mqtt_source_connector_test.go +++ b/internal/io/mqtt/mqtt_source_connector_test.go @@ -27,12 +27,12 @@ import ( "github.com/stretchr/testify/require" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/testx" "github.com/lf-edge/ekuiper/internal/topo/connection/factory" "github.com/lf-edge/ekuiper/internal/topo/context" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) // NOTICE!!! Need to run a MQTT broker in localhost:1883 for this test or change the url to your broker diff --git a/internal/io/neuron/multiple_test.go b/internal/io/neuron/multiple_test.go index 87daaec296..a5d9140bb6 100644 --- a/internal/io/neuron/multiple_test.go +++ b/internal/io/neuron/multiple_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import ( "github.com/benbjohnson/clock" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" ) // Test scenario of multiple neuron instances diff --git a/internal/io/neuron/neuron_test.go b/internal/io/neuron/neuron_test.go index 83f80295aa..a4e71ac93b 100644 --- a/internal/io/neuron/neuron_test.go +++ b/internal/io/neuron/neuron_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -28,8 +28,8 @@ import ( _ "go.nanomsg.org/mangos/v3/transport/ipc" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" ) var data = [][]byte{ diff --git a/internal/io/neuron/sink_test.go b/internal/io/neuron/sink_test.go index 825bfd82a7..74e47814cb 100644 --- a/internal/io/neuron/sink_test.go +++ b/internal/io/neuron/sink_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ import ( "testing" "time" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/pkg/errorx" + "github.com/lf-edge/ekuiper/pkg/mock" ) func sinkTest(t *testing.T) { diff --git a/internal/io/neuron/source_test.go b/internal/io/neuron/source_test.go index 3800a1f403..0571300371 100644 --- a/internal/io/neuron/source_test.go +++ b/internal/io/neuron/source_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,10 +24,10 @@ import ( _ "go.nanomsg.org/mangos/v3/transport/ipc" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestRun(t *testing.T) { diff --git a/internal/io/redis/pubsub/redisPub_test.go b/internal/io/redis/pubsub/redisPub_test.go index dbbf56b841..f023d40164 100644 --- a/internal/io/redis/pubsub/redisPub_test.go +++ b/internal/io/redis/pubsub/redisPub_test.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 emy120115@gmail.com +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,9 +24,9 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/require" - "github.com/lf-edge/ekuiper/internal/io/mock" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/pkg/errorx" + "github.com/lf-edge/ekuiper/pkg/mock" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestRedisPub(t *testing.T) { diff --git a/internal/io/redis/pubsub/redisSub_test.go b/internal/io/redis/pubsub/redisSub_test.go index cb59a88891..e27ee61135 100644 --- a/internal/io/redis/pubsub/redisSub_test.go +++ b/internal/io/redis/pubsub/redisSub_test.go @@ -1,4 +1,4 @@ -// Copyright 2023-2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import ( "github.com/stretchr/testify/require" _ "go.nanomsg.org/mangos/v3/transport/ipc" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/pkg/api" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestConnectFail(t *testing.T) { diff --git a/internal/io/simulator/source_test.go b/internal/io/simulator/source_test.go index 997f15fe01..a8ae596899 100644 --- a/internal/io/simulator/source_test.go +++ b/internal/io/simulator/source_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock" ) func TestSource_Configure(t *testing.T) { diff --git a/internal/io/websocket/websocket_sink_test.go b/internal/io/websocket/websocket_sink_test.go index 8ac03f499b..92e5c9f1db 100644 --- a/internal/io/websocket/websocket_sink_test.go +++ b/internal/io/websocket/websocket_sink_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ import ( "github.com/gorilla/websocket" "github.com/stretchr/testify/require" - "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/topo/connection/factory" + "github.com/lf-edge/ekuiper/pkg/mock/context" ) const ( diff --git a/internal/topo/connection/clients/websocket/websocket_client_test.go b/internal/topo/connection/clients/websocket/websocket_client_test.go index 7bff37cf1d..337cdf7e88 100644 --- a/internal/topo/connection/clients/websocket/websocket_client_test.go +++ b/internal/topo/connection/clients/websocket/websocket_client_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ import ( "github.com/stretchr/testify/require" "github.com/lf-edge/ekuiper/internal/conf" - "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock/context" ) const ( diff --git a/internal/topo/connection/clients/websocket/websocket_server_test.go b/internal/topo/connection/clients/websocket/websocket_server_test.go index a514c0f8e2..d6797c31c5 100644 --- a/internal/topo/connection/clients/websocket/websocket_server_test.go +++ b/internal/topo/connection/clients/websocket/websocket_server_test.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2023-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,8 +26,8 @@ import ( "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/io/http/httpserver" - "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/mock/context" ) var ( diff --git a/internal/topo/node/decode_op_test.go b/internal/topo/node/decode_op_test.go index 8824c243b6..1fdcb53016 100644 --- a/internal/topo/node/decode_op_test.go +++ b/internal/topo/node/decode_op_test.go @@ -21,10 +21,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/ast" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestJSON(t *testing.T) { diff --git a/internal/topo/node/decompress_op_test.go b/internal/topo/node/decompress_op_test.go index 96da4c31cb..7fab23c721 100644 --- a/internal/topo/node/decompress_op_test.go +++ b/internal/topo/node/decompress_op_test.go @@ -20,9 +20,9 @@ import ( "github.com/stretchr/testify/assert" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestNewDecompressOp(t *testing.T) { diff --git a/internal/topo/node/metric/stat_manager_test.go b/internal/topo/node/metric/stat_manager_test.go index 46d57e1fb3..44f3b6a6f6 100644 --- a/internal/topo/node/metric/stat_manager_test.go +++ b/internal/topo/node/metric/stat_manager_test.go @@ -19,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestGetMetrics(t *testing.T) { diff --git a/internal/topo/node/source_connector_node_test.go b/internal/topo/node/source_connector_node_test.go index 5711ab78ed..4094d39fdf 100644 --- a/internal/topo/node/source_connector_node_test.go +++ b/internal/topo/node/source_connector_node_test.go @@ -26,9 +26,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/internal/conf" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestSCNLC(t *testing.T) { diff --git a/internal/topo/subtopo_test.go b/internal/topo/subtopo_test.go index 08bd5f940d..038b5b45c5 100644 --- a/internal/topo/subtopo_test.go +++ b/internal/topo/subtopo_test.go @@ -22,11 +22,11 @@ import ( "github.com/stretchr/testify/assert" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/topo/checkpoint" "github.com/lf-edge/ekuiper/internal/topo/node" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/ast" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestSubtopoLC(t *testing.T) { diff --git a/internal/io/mock/context/mock.go b/pkg/mock/context/mock.go similarity index 96% rename from internal/io/mock/context/mock.go rename to pkg/mock/context/mock.go index 9354a727b3..9285fe58b1 100644 --- a/internal/io/mock/context/mock.go +++ b/pkg/mock/context/mock.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/io/mock/test_sink.go b/pkg/mock/test_sink.go similarity index 89% rename from internal/io/mock/test_sink.go rename to pkg/mock/test_sink.go index 06df530fa7..9acd597ee7 100644 --- a/internal/io/mock/test_sink.go +++ b/pkg/mock/test_sink.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,8 +18,8 @@ import ( "fmt" "time" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/pkg/api" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func RunSinkCollect(s api.Sink, data []interface{}) error { diff --git a/internal/io/mock/test_source.go b/pkg/mock/test_source.go similarity index 97% rename from internal/io/mock/test_source.go rename to pkg/mock/test_source.go index 646b9a4d45..d90ac6a91a 100644 --- a/internal/io/mock/test_source.go +++ b/pkg/mock/test_source.go @@ -24,11 +24,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/internal/converter" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/topo/context" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/api" "github.com/lf-edge/ekuiper/pkg/ast" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) var count atomic.Value diff --git a/internal/io/mock/test_source_connector.go b/pkg/mock/test_source_connector.go similarity index 96% rename from internal/io/mock/test_source_connector.go rename to pkg/mock/test_source_connector.go index 5824164adb..23626c7902 100644 --- a/internal/io/mock/test_source_connector.go +++ b/pkg/mock/test_source_connector.go @@ -21,8 +21,8 @@ import ( "github.com/stretchr/testify/assert" - mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/pkg/api" + mockContext "github.com/lf-edge/ekuiper/pkg/mock/context" ) func TestSourceConnector(t *testing.T, r api.SourceConnector, expected []api.SourceTuple, sender func()) { diff --git a/pkg/modules/io.go b/pkg/modules/io.go new file mode 100644 index 0000000000..271b0394dc --- /dev/null +++ b/pkg/modules/io.go @@ -0,0 +1,43 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package modules + +import ( + "github.com/lf-edge/ekuiper/pkg/api" +) + +type ( + NewSourceFunc func() api.Source + NewLookupSourceFunc func() api.LookupSource + NewSinkFunc func() api.Sink +) + +var ( + Sources = map[string]NewSourceFunc{} + Sinks = map[string]NewSinkFunc{} + LookupSources = map[string]NewLookupSourceFunc{} +) + +func RegisterSource(name string, f NewSourceFunc) { + Sources[name] = f +} + +func RegisterSink(name string, f NewSinkFunc) { + Sinks[name] = f +} + +func RegisterLookupSource(name string, f NewLookupSourceFunc) { + LookupSources[name] = f +}