Skip to content

Commit

Permalink
Tibber Pulse: fix server going silent (evcc-io#7575)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Jun 17, 2023
1 parent 5045591 commit 795ac2e
Showing 1 changed file with 67 additions and 41 deletions.
108 changes: 67 additions & 41 deletions meter/tibber-pulse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ func init() {
registry.Add("tibber-pulse", NewTibberFromConfig)
}

var timeout = time.Minute

type Tibber struct {
mu sync.Mutex
log *util.Logger
updated time.Time
live tibber.LiveMeasurement
mu sync.Mutex
log *util.Logger
updated time.Time
live tibber.LiveMeasurement
url string
token, homeID string
client *graphql.SubscriptionClient
}

func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
Expand All @@ -43,12 +44,10 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
return nil, errors.New("missing token")
}

t := &Tibber{
log: util.NewLogger("pulse").Redact(cc.Token, cc.HomeID),
}
log := util.NewLogger("pulse").Redact(cc.Token, cc.HomeID)

// query client
qclient := tibber.NewClient(t.log, cc.Token)
qclient := tibber.NewClient(log, cc.Token)

if cc.HomeID == "" {
home, err := qclient.DefaultHome("")
Expand All @@ -71,8 +70,22 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
return nil, err
}

// subscription client
client := graphql.NewSubscriptionClient(res.Viewer.WebsocketSubscriptionUrl).
t := &Tibber{
log: log,
url: res.Viewer.WebsocketSubscriptionUrl,
token: cc.Token,
homeID: cc.HomeID,
}

// run the client
err := t.reconnect()

return t, err
}

// newSubscriptionClient creates graphql subscription client
func (t *Tibber) newSubscriptionClient() {
t.client = graphql.NewSubscriptionClient(t.url).
WithProtocol(graphql.GraphQLWS).
WithWebSocketOptions(graphql.WebsocketOptions{
HTTPClient: &http.Client{
Expand All @@ -85,29 +98,22 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) {
},
}).
WithConnectionParams(map[string]any{
"token": cc.Token,
"token": t.token,
}).
WithRetryTimeout(0).
WithLog(t.log.TRACE.Println)

// run the client
done := make(chan error)
go t.subscribe(client, cc.HomeID, done)
err := <-done

return t, err
}

// subscribe to the websocket query
func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string, done chan error) {
var query struct {
tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"`
}

var once sync.Once
func (t *Tibber) subscribe(done chan error) {
var (
once sync.Once
query struct {
tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"`
}
)

_, err := client.Subscribe(&query, map[string]any{
"homeId": graphql.ID(homeID),
_, err := t.client.Subscribe(&query, map[string]any{
"homeId": graphql.ID(t.homeID),
}, func(data []byte, err error) error {
if err != nil {
once.Do(func() { done <- err })
Expand All @@ -133,40 +139,60 @@ func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string, do

return nil
})

if err != nil {
once.Do(func() { done <- err })
}

go func() {
if err := client.Run(); err != nil {
if err := t.client.Run(); err != nil {
once.Do(func() { done <- err })
}
}()
}

// CurrentPower implements the api.Meter interface
func (t *Tibber) CurrentPower() (float64, error) {
func (t *Tibber) reconnect() error {
const timeout = time.Minute

t.mu.Lock()
defer t.mu.Unlock()
if time.Since(t.updated) <= timeout {
t.mu.Unlock()
return nil
}
t.mu.Unlock()

if time.Since(t.updated) > timeout {
return 0, api.ErrTimeout
if t.client != nil {
if err := t.client.Close(); err != nil {
t.log.DEBUG.Println("close:", err)
}
}

t.newSubscriptionClient()

done := make(chan error)
go t.subscribe(done)

return <-done
}

func (t *Tibber) CurrentPower() (float64, error) {
if err := t.reconnect(); err != nil {
return 0, err
}

t.mu.Lock()
defer t.mu.Unlock()
return t.live.Power - t.live.PowerProduction, nil
}

var _ api.PhaseCurrents = (*Tibber)(nil)

// Currents implements the api.PhaseCurrents interface
func (t *Tibber) Currents() (float64, float64, float64, error) {
t.mu.Lock()
defer t.mu.Unlock()

if time.Since(t.updated) > timeout {
return 0, 0, 0, api.ErrTimeout
if err := t.reconnect(); err != nil {
return 0, 0, 0, err
}

t.mu.Lock()
defer t.mu.Unlock()
return t.live.CurrentL1, t.live.CurrentL2, t.live.CurrentL3, nil
}

0 comments on commit 795ac2e

Please sign in to comment.