Skip to content

Commit

Permalink
support status
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Jun 5, 2024
1 parent bc77174 commit 2f7c51c
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 9 deletions.
5 changes: 0 additions & 5 deletions internal/binder/io/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/binder"
mqttClient "github.com/lf-edge/ekuiper/v2/internal/io/mqtt/client"
)

var ( // init once and read only
Expand Down Expand Up @@ -100,7 +99,3 @@ func LookupSource(name string) (api.LookupSource, error) {
}
return nil, errs
}

func RegisterConnection() {
mqttClient.Register()
}
35 changes: 35 additions & 0 deletions internal/io/connection/connection_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package connection

import (
"errors"
"strings"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
Expand All @@ -23,6 +26,7 @@ import (
func InitMockTest() {
conf.IsTesting = true
modules.ConnectionRegister["mock"] = CreateMockConnection
modules.ConnectionRegister[strings.ToLower("mockErr")] = CreateMockErrConnection
}

type mockConnection struct {
Expand Down Expand Up @@ -61,3 +65,34 @@ func CreateMockConnection(ctx api.StreamContext, id string, props map[string]any
m := &mockConnection{id: id, ref: 0}
return m, nil
}

type mockErrConnection struct {
}

func (m mockErrConnection) Ping(ctx api.StreamContext) error {
return errors.New("mockErr")

Check warning on line 73 in internal/io/connection/connection_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_mock.go#L72-L73

Added lines #L72 - L73 were not covered by tests
}

func (m mockErrConnection) Close(ctx api.StreamContext) {
return

Check warning on line 77 in internal/io/connection/connection_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_mock.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}

func (m mockErrConnection) Attach(ctx api.StreamContext) {
return

Check warning on line 81 in internal/io/connection/connection_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_mock.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}

func (m mockErrConnection) DetachSub(ctx api.StreamContext, props map[string]any) {
return

Check warning on line 85 in internal/io/connection/connection_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_mock.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

func (m mockErrConnection) DetachPub(ctx api.StreamContext, props map[string]any) {
return

Check warning on line 89 in internal/io/connection/connection_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_mock.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}

func (m mockErrConnection) Ref(ctx api.StreamContext) int {
return 0

Check warning on line 93 in internal/io/connection/connection_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_mock.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}

func CreateMockErrConnection(ctx api.StreamContext, id string, props map[string]any) (modules.Connection, error) {
return nil, errors.New("mockErr")
}
8 changes: 4 additions & 4 deletions internal/io/connection/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ func DropNameConnection(ctx api.StreamContext, selId string) error {
defer globalConnectionManager.Unlock()
meta, ok := globalConnectionManager.connectionPool[selId]
if !ok {
_, ok := globalConnectionManager.failConnection[selId]
if ok {
delete(globalConnectionManager.failConnection, selId)

Check warning on line 214 in internal/io/connection/connection_pool.go

View check run for this annotation

Codecov / codecov/patch

internal/io/connection/connection_pool.go#L214

Added line #L214 was not covered by tests
}
return nil
}
conn := meta.conn
Expand Down Expand Up @@ -285,10 +289,6 @@ type ConnectionMeta struct {
conn modules.Connection `json:"-"`
}

func init() {
modules.ConnectionRegister["mock"] = CreateMockConnection
}

func NewExponentialBackOff() *backoff.ExponentialBackOff {
return backoff.NewExponentialBackOff(
backoff.WithInitialInterval(DefaultInitialInterval),
Expand Down
24 changes: 24 additions & 0 deletions internal/io/connection/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,27 @@ func TestConnectionErr(t *testing.T) {
require.Error(t, err)
failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/io/connection/dropConnectionStoreErr")
}

func TestConnectionStatus(t *testing.T) {
dataDir, err := conf.GetDataLoc()
require.NoError(t, err)
require.NoError(t, store.SetupDefault(dataDir))
require.NoError(t, InitConnectionManager4Test())

conf.WriteCfgIntoKVStorage("connections", "mockErr", "a1", map[string]interface{}{})
conf.WriteCfgIntoKVStorage("connections", "mock", "a2", map[string]interface{}{})
require.NoError(t, ReloadConnection())
ctx := context.Background()
allStatus := GetAllConnectionStatus(ctx)
s, ok := allStatus["a1"]
require.True(t, ok)
require.Equal(t, ConnectionStatus{
Status: ConnectionFail,
ErrMsg: "mockErr",
}, s)
s, ok = allStatus["a2"]
require.True(t, ok)
require.Equal(t, ConnectionStatus{
Status: ConnectionRunning,
}, s)
}
10 changes: 10 additions & 0 deletions internal/server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ type ConnectionRequest struct {
Props map[string]interface{} `json:"props"`
}

func connectionsStatusHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
switch r.Method {
case http.MethodGet:
allStatus := connection.GetAllConnectionStatus(context.Background())
w.WriteHeader(http.StatusOK)
jsonResponse(allStatus, w, logger)

Check warning on line 40 in internal/server/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/server/connection.go#L34-L40

Added lines #L34 - L40 were not covered by tests
}
}

func connectionsHandler(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
switch r.Method {
Expand Down
1 change: 1 addition & 0 deletions internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
r.HandleFunc("/connections", connectionsHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/connections/status", connectionsStatusHandler).Methods(http.MethodGet)

Check warning on line 180 in internal/server/rest.go

View check run for this annotation

Codecov / codecov/patch

internal/server/rest.go#L180

Added line #L180 was not covered by tests
r.HandleFunc("/connection/{id}", connectionHandler).Methods(http.MethodGet, http.MethodDelete)
// r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodGet, http.MethodPost, http.MethodDelete)
// Register extended routes
Expand Down

0 comments on commit 2f7c51c

Please sign in to comment.