Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(native): fix plugin config install #3127

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions extensions/impl/zmq/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package zmq

import (
"context"
"fmt"

"github.com/lf-edge/ekuiper/contract/v2/api"
Expand All @@ -29,7 +28,6 @@ import (
type zmqSource struct {
subscriber *zmq.Socket
sc *c
cancel context.CancelFunc
}

func (s *zmqSource) Provision(ctx api.StreamContext, configs map[string]any) error {
Expand Down Expand Up @@ -60,43 +58,51 @@ func (s *zmqSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ing
if err != nil {
return err
}
dataChan := make(chan [][]byte, 10)
go func() {
for {
msgs, e := s.subscriber.RecvMessageBytes(0)
if e != nil {
id, e := s.subscriber.GetIdentity()
ingestError(ctx, fmt.Errorf("zmq source getting message %s error: %v", id, e))
} else {
ctx.GetLogger().Debugf("zmq source receive %v", msgs)
select {
case dataChan <- msgs:
case <-ctx.Done():
return
}

}
}
}()
for {
msgs, err := s.subscriber.RecvMessageBytes(0)
if err != nil {
id, err := s.subscriber.GetIdentity()
ingestError(ctx, fmt.Errorf("zmq source getting message %s error: %v", id, err))
} else {
select {
case msgs := <-dataChan:
rcvTime := timex.GetNow()
ctx.GetLogger().Debugf("zmq source receive %v", msgs)
var m []byte
for i, msg := range msgs {
if i == 0 && s.sc.Topic != "" {
continue
}
m = append(m, msg...)
}
meta := make(map[string]interface{})
meta := make(map[string]any)
if s.sc.Topic != "" {
meta["topic"] = string(msgs[0])
}
ingest(ctx, m, meta, rcvTime)
}
select {
case <-ctx.Done():
ctx.GetLogger().Infof("zmq source done")
if s.subscriber != nil {
s.subscriber.Close()
}
default:
// do nothing
return nil
}
}
}

func (s *zmqSource) Close(_ api.StreamContext) error {
if s.cancel != nil {
s.cancel()
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions extensions/sources/zmq/zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !windows

package main

import (
Expand Down
9 changes: 7 additions & 2 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,12 @@ var (
}
)

func GetConfLoc() (string, error) {
func GetConfLoc() (s string, err error) {
defer func() {
failpoint.Inject("GetConfLocErr", func() {
err = errors.New("GetConfLocErr")
})
}()
return GetLoc(etcDir)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/plugin/native/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func InitManager() (*Manager, error) {
if err != nil {
return nil, fmt.Errorf("cannot find plugins folder: %s", err)
}
dataDir, err := conf.GetDataLoc()
etcDir, err := conf.GetConfLoc()
if err != nil {
return nil, fmt.Errorf("cannot find data folder: %s", err)
}
Expand All @@ -100,7 +100,7 @@ func InitManager() (*Manager, error) {
if err != nil {
return nil, fmt.Errorf("error when opening nativePluginStatus: %v", err)
}
registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: dataDir, runtime: make(map[string]*plugin.Plugin)}
registry := &Manager{symbols: make(map[string]string), funcSymbolsDb: func_db, plgInstallDb: plg_db, plgStatusDb: plg_status_db, pluginDir: pluginDir, pluginConfDir: etcDir, runtime: make(map[string]*plugin.Plugin)}
manager = registry

plugins := make([]map[string]string, 3)
Expand Down
4 changes: 2 additions & 2 deletions internal/plugin/portable/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func InitManager() (*Manager, error) {
if err != nil {
return nil, fmt.Errorf("cannot find plugins folder: %s", err)
}
dataDir, err := conf.GetDataLoc()
etcDir, err := conf.GetConfLoc()
if err != nil {
return nil, fmt.Errorf("cannot find data folder: %s", err)
}
Expand All @@ -81,7 +81,7 @@ func InitManager() (*Manager, error) {
pluginDir = filepath.Join(pluginDir, "portable")
m := &Manager{
pluginDir: pluginDir,
pluginConfDir: dataDir,
pluginConfDir: etcDir,
reg: reg,
}
err = m.syncRegistry()
Expand Down
6 changes: 3 additions & 3 deletions internal/plugin/portable/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -212,10 +212,10 @@ func TestManagerErr(t *testing.T) {
require.Error(t, err)
failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/conf/GetPluginsLocErr")

failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/conf/GetDataLocErr", "return(true)")
failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/conf/GetConfLocErr", "return(true)")
_, err = InitManager()
require.Error(t, err)
failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/conf/GetDataLocErr")
failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/conf/GetConfLocErr")

failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/plugin/portable/syncRegistryErr", "return(true)")
_, err = InitManager()
Expand Down
3 changes: 3 additions & 0 deletions internal/topo/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func (o *defaultNode) finishExec() {

func (o *defaultNode) Close() {
if o.opsWg != nil {
o.ctx.GetLogger().Infof("node %s is closing", o.name)
o.opsWg.Done()
} else {
o.ctx.GetLogger().Infof("node %s is missing close wg", o.name)
}
}

Expand Down
4 changes: 0 additions & 4 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,3 @@ func (m *SourceNode) doPull(ctx api.StreamContext, tc time.Time) error {
return nil
})
}

func (m *SourceNode) Close() {
m.defaultNode.Close()
}
1 change: 0 additions & 1 deletion internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,5 @@ func (s *Topo) WaitClose() {
if s.opsWg != nil {
s.opsWg.Wait()
s.opsWg = nil
conf.Log.Infof("rule %s stopped", s.ctx.GetRuleId())
}
}
85 changes: 85 additions & 0 deletions test/prepare_plugins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,93 @@ chmod +x test/plugins/sql/create_table

cd test

rm -rf zmq.* Zmq.so

FILE=../plugins/sources/Zmq.so
if [ -f "$FILE" ]; then
echo "$FILE exists, not requried to build plugin."
else
echo "$FILE does not exist, will build the plugin."
go build -trimpath --buildmode=plugin --cover -covermode=atomic -coverpkg=../... -o ../plugins/sources/Zmq.so ../extensions/sources/zmq/zmq.go
fi

mv ../plugins/sources/Zmq.so .
cp ../extensions/sources/zmq/zmq.yaml .
cp ../extensions/sources/zmq/zmq.json .
zip zmq.zip Zmq.so zmq.yaml zmq.json
rm -rf zmq.yaml Zmq.so

rm -rf image.* Image.so

FILE=../plugins/functions/Image.so
if [ -f "$FILE" ]; then
echo "$FILE exists, not requried to build plugin."
else
echo "$FILE does not exist, will build the plugin."
go build -trimpath --buildmode=plugin --cover -covermode=atomic -coverpkg=../... -o ../plugins/functions/Image.so ../extensions/functions/image/*.go
fi

mv ../plugins/functions/Image.so .
cp ../extensions/functions/image/image.json .
zip image.zip Image.so image.json
rm -rf Image.so

# build sql plugins
FILE=../plugins/sinks/Sql.so
if [ -f "$FILE" ]; then
echo "$FILE exists, not required to build plugin."
else
echo "$FILE does not exist, will build the plugin."
go build -trimpath --buildmode=plugin --cover -covermode=atomic -coverpkg=../... -o ../plugins/sinks/Sql.so ../extensions/sinks/sql/*.go
fi

mv ../plugins/sinks/Sql.so .
cp ../extensions/sources/sql/sql.json .
zip sql.zip Sql.so sql.json
rm -rf Sql.so

FILE=../plugins/sources/Sql.so
if [ -f "$FILE" ]; then
echo "$FILE exists, not required to build plugin."
else
echo "$FILE does not exist, will build the plugin."
go build -trimpath --buildmode=plugin --cover -covermode=atomic -coverpkg=../... -o ../plugins/sources/Sql.so ../extensions/sources/sql/*.go
fi

mv ../plugins/sources/Sql.so .
cp yaml_for_test/sql.yaml .
cp ../extensions/sources/sql/sql.json .
zip sqlSrc.zip Sql.so sql.yaml sql.json
rm -rf Sql.so

rm -rf plugins/service/web/plugins/
mkdir -p plugins/service/web/plugins/
mv zmq.zip plugins/service/web/plugins/
mv image.zip plugins/service/web/plugins/
mv sql.zip plugins/service/web/plugins/
mv sqlSrc.zip plugins/service/web/plugins/

# prepare portable plugins
cd ..
mkdir test/temp

mkdir test/temp/mirror
cd sdk/go/example/mirror
go build -o ../../../../test/temp/mirror/mirror .
cd ../../../..
cp sdk/go/example/mirror/mirror.json test/temp/mirror
cp -r sdk/go/example/mirror/sources test/temp/mirror/
cd test/temp/mirror
zip -r ../../plugins/service/web/plugins/mirror.zip *
cd ../../..

cp -r sdk/python/example/pysam test/temp/pysam
cp -r sdk/python/ekuiper test/temp/pysam/
cd test/temp/pysam
zip -r ../../plugins/service/web/plugins/pysam.zip *
cd ../..

rm -r temp

# prepare portable plugins
cd ..
Expand Down
14 changes: 7 additions & 7 deletions test/run_jmeter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ if test $with_edgex = true; then
/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/edgex_array_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/edgex_array_rule.jtl -j jmeter_logs/edgex_array_rule.log
echo -e "---------------------------------------------\n"
fi
#
#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log
#echo -e "---------------------------------------------\n"
#

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/plugin_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/plugin_end_2_end.jtl -j jmeter_logs/plugin_end_2_end.log
echo -e "---------------------------------------------\n"

#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/portable_end_2_end.jmx -Dfvt="$fvt_dir" -l jmeter_logs/portable_end_2_end.jtl -j jmeter_logs/portable_end_2_end.log
#echo -e "---------------------------------------------\n"

Expand All @@ -119,8 +119,8 @@ echo -e "---------------------------------------------\n"
/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/http_pull_rule.jmx -Dfvt="$fvt_dir" -l jmeter_logs/http_pull_rule.jtl -j jmeter_logs/http_pull_rule.log
echo -e "---------------------------------------------\n"

#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/binary_image_process.jmx -Dfvt="$fvt_dir" -l jmeter_logs/binary_image_process.jtl -j jmeter_logs/binary_image_process.log
#echo -e "---------------------------------------------\n"
/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/binary_image_process.jmx -Dfvt="$fvt_dir" -l jmeter_logs/binary_image_process.jtl -j jmeter_logs/binary_image_process.log
echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/table_static.jmx -Dfvt="$fvt_dir" -l jmeter_logs/table_static.jtl -j jmeter_logs/table_static.log
echo -e "---------------------------------------------\n"
Expand Down Expand Up @@ -154,7 +154,7 @@ echo -e "---------------------------------------------\n"

/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_redis.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_redis.jtl -j jmeter_logs/lookup_table_redis.log
echo -e "---------------------------------------------\n"
#

#/opt/jmeter/bin/jmeter.sh -Jjmeter.save.saveservice.output_format=xml -n -t test/lookup_table_sql.jmx -Dfvt="$fvt_dir" -l jmeter_logs/lookup_table_sql.jtl -j jmeter_logs/lookup_table_sql.log
#echo -e "---------------------------------------------\n"

Expand Down
Loading