Skip to content

Commit

Permalink
Websocket plugin: apply jq when receiving value (evcc-io#7640)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Apr 24, 2023
1 parent d7dc0ee commit 9802525
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ require (
google.golang.org/protobuf v1.30.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/gorm v1.25.0
nhooyr.io/websocket v1.8.7
)

require (
Expand Down Expand Up @@ -196,7 +197,6 @@ require (
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.21.1 // indirect
nhooyr.io/websocket v1.8.7 // indirect
)

replace github.com/foogod/go-powerwall => github.com/andig/go-powerwall v0.2.1-0.20230102102528-4d59ac6910c6
14 changes: 7 additions & 7 deletions provider/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ func (p *Socket) listen() {

p.log.TRACE.Printf("recv: %s", b)

p.mux.Lock()
p.val = b
p.wait.Update()
p.mux.Unlock()
if v, err := p.pipeline.Process(b); err == nil {
p.mux.Lock()
p.val = v
p.wait.Update()
p.mux.Unlock()
}
}
}
}
Expand All @@ -146,13 +148,11 @@ var _ StringProvider = (*Socket)(nil)
// StringGetter sends string request
func (p *Socket) StringGetter() func() (string, error) {
return func() (string, error) {
b, err := p.hasValue()
v, err := p.hasValue()
if err != nil {
return "", err
}

v, err := p.pipeline.Process(b)

return string(v), err
}
}
Expand Down
53 changes: 53 additions & 0 deletions provider/socket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package provider

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"
"nhooyr.io/websocket"
)

func TestSocketProvider(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, nil)
require.NoError(t, err)
defer c.Close(websocket.StatusNormalClosure, "")

uuids := []string{"foo", "bar"}
for i := 0; ; i++ {
json := fmt.Sprintf(`{"data":{"uuid":"%s","tuples":[[1682319567986,%d]]}}`, uuids[i%2], i%2)

if err := c.Write(ctx, websocket.MessageText, []byte(json)); err != nil {
require.NoError(t, err)
}

select {
case <-time.Tick(time.Millisecond):
case <-ctx.Done():
return
}
}
}))

defer srv.Close()

addr := "ws://" + srv.Listener.Addr().String()
p, err := NewSocketProviderFromConfig(map[string]any{
"uri": addr,
"jq": `.data | select(.uuid=="bar") .tuples[0][1]`,
})
require.NoError(t, err)

g := p.IntGetter()
i, err := g()
require.NoError(t, err)
require.Equal(t, int64(1), i)
}

0 comments on commit 9802525

Please sign in to comment.