Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tibber Pulse: switch to graphql-ws websocket transport #5808

Merged
merged 6 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/gregdel/pushover v1.1.0
github.com/grid-x/modbus v0.0.0-20221121121528-8cdd929d093f
github.com/hashicorp/go-version v1.6.0
github.com/hasura/go-graphql-client v0.8.1
github.com/hasura/go-graphql-client v0.8.2-0.20230118111322-12655772463c
github.com/imdario/mergo v0.3.13
github.com/influxdata/influxdb-client-go/v2 v2.12.1
github.com/itchyny/gojq v0.12.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hasura/go-graphql-client v0.8.1 h1:yU4888urgkW4L47cs+QQDXl3YfVaNraUqym5qsJ41Ms=
github.com/hasura/go-graphql-client v0.8.1/go.mod h1:NVifIwv+YFIUYGLQ7SM2/vBbzS/9rFP4vmIf/vf/zXM=
github.com/hasura/go-graphql-client v0.8.2-0.20230118111322-12655772463c h1:BruXcowJs5r1MKf534ObCBjA9zk5XKUZ5W2m4NdE3Ak=
github.com/hasura/go-graphql-client v0.8.2-0.20230118111322-12655772463c/go.mod h1:NVifIwv+YFIUYGLQ7SM2/vBbzS/9rFP4vmIf/vf/zXM=
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog=
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68=
github.com/holoplot/go-avahi v1.0.1 h1:XcqR2keL4qWRnlxHD5CAOdWpLFZJ+EOUK0vEuylfvvk=
Expand Down
86 changes: 51 additions & 35 deletions meter/tibber-pulse.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package meter

import (
"context"
"encoding/json"
"errors"
"net/http"
"sync"
"time"

"github.com/evcc-io/evcc/api"
"github.com/evcc-io/evcc/meter/tibber"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/request"
"github.com/evcc-io/evcc/util/transport"
"github.com/hasura/go-graphql-client"
)

Expand Down Expand Up @@ -43,90 +47,102 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
log: util.NewLogger("pulse").Redact(cc.Token, cc.HomeID),
}

if cc.HomeID == "" {
// query client
qclient := tibber.NewClient(t.log, cc.Token)
// query client
qclient := tibber.NewClient(t.log, cc.Token)

home, err := qclient.DefaultHome("")
if err != nil {
if cc.HomeID == "" {
if home, err := qclient.DefaultHome(""); err != nil {
return nil, err
} else {
cc.HomeID = home.ID
}
cc.HomeID = home.ID
}

var res struct {
Viewer struct {
WebsocketSubscriptionUrl string
}
}

ctx, cancel := context.WithTimeout(context.Background(), request.Timeout)
defer cancel()

if err := qclient.Query(ctx, &res, nil); err != nil {
return nil, err
}

// subscription client
client := graphql.NewSubscriptionClient(tibber.SubscriptionURI).
client := graphql.NewSubscriptionClient(res.Viewer.WebsocketSubscriptionUrl).
WithProtocol(graphql.GraphQLWS).
WithWebSocketOptions(graphql.WebsocketOptions{
HTTPClient: &http.Client{
Transport: &transport.Decorator{
Base: http.DefaultTransport,
Decorator: transport.DecorateHeaders(map[string]string{
"User-Agent": "go-graphql-client/0.9.0",
}),
},
},
}).
WithConnectionParams(map[string]any{
"token": cc.Token,
}).
WithRetryTimeout(timeout).
WithLog(t.log.TRACE.Println)

// run the client
go func() {
if err := client.Run(); err != nil {
t.log.ERROR.Println(err)
}
}()

err := t.subscribe(client, cc.HomeID)
done := make(chan error)
go t.subscribe(client, cc.HomeID, done)
err := <-done

return t, err
}

// subscribe to the websocket query
func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string) error {
func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string, done chan error) {
var query struct {
tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"`
}

var once sync.Once
recv := make(chan struct{})
errC := make(chan error)

_, err := client.Subscribe(&query, map[string]any{
"homeId": graphql.ID(homeID),
}, func(data []byte, err error) error {
if err != nil {
select {
case errC <- err:
default:
}
return nil
once.Do(func() { done <- err })
}

var res struct {
LiveMeasurement tibber.LiveMeasurement
}

if err := json.Unmarshal(data, &res); err != nil {
once.Do(func() { done <- err })

t.log.ERROR.Println(err)
return nil
}

once.Do(func() {
close(recv)
})

t.mu.Lock()
t.live = res.LiveMeasurement
t.updated = time.Now()
t.mu.Unlock()

once.Do(func() { close(done) })

return nil
})

// wait for connection
if err == nil {
select {
case <-recv:
case <-time.After(timeout):
err = api.ErrTimeout
case err = <-errC:
}
if err != nil {
once.Do(func() { done <- err })
}

return err
go func() {
if err := client.Run(); err != nil {
once.Do(func() { done <- err })
}
}()
}

// CurrentPower implements the api.Meter interface
Expand Down