From 795ac2e30b11165fec149cedfe9089bf0b728d16 Mon Sep 17 00:00:00 2001 From: andig Date: Sat, 17 Jun 2023 10:25:52 +0200 Subject: [PATCH] Tibber Pulse: fix server going silent (#7575) --- meter/tibber-pulse.go | 108 ++++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 41 deletions(-) diff --git a/meter/tibber-pulse.go b/meter/tibber-pulse.go index d516650e8c..fd7bcf0ffc 100644 --- a/meter/tibber-pulse.go +++ b/meter/tibber-pulse.go @@ -20,13 +20,14 @@ func init() { registry.Add("tibber-pulse", NewTibberFromConfig) } -var timeout = time.Minute - type Tibber struct { - mu sync.Mutex - log *util.Logger - updated time.Time - live tibber.LiveMeasurement + mu sync.Mutex + log *util.Logger + updated time.Time + live tibber.LiveMeasurement + url string + token, homeID string + client *graphql.SubscriptionClient } func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) { @@ -43,12 +44,10 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) { return nil, errors.New("missing token") } - t := &Tibber{ - log: util.NewLogger("pulse").Redact(cc.Token, cc.HomeID), - } + log := util.NewLogger("pulse").Redact(cc.Token, cc.HomeID) // query client - qclient := tibber.NewClient(t.log, cc.Token) + qclient := tibber.NewClient(log, cc.Token) if cc.HomeID == "" { home, err := qclient.DefaultHome("") @@ -71,8 +70,22 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) { return nil, err } - // subscription client - client := graphql.NewSubscriptionClient(res.Viewer.WebsocketSubscriptionUrl). + t := &Tibber{ + log: log, + url: res.Viewer.WebsocketSubscriptionUrl, + token: cc.Token, + homeID: cc.HomeID, + } + + // run the client + err := t.reconnect() + + return t, err +} + +// newSubscriptionClient creates graphql subscription client +func (t *Tibber) newSubscriptionClient() { + t.client = graphql.NewSubscriptionClient(t.url). WithProtocol(graphql.GraphQLWS). WithWebSocketOptions(graphql.WebsocketOptions{ HTTPClient: &http.Client{ @@ -85,29 +98,22 @@ func NewTibberFromConfig(other map[string]interface{}) (api.Meter, error) { }, }). WithConnectionParams(map[string]any{ - "token": cc.Token, + "token": t.token, }). WithRetryTimeout(0). WithLog(t.log.TRACE.Println) - - // run the client - done := make(chan error) - go t.subscribe(client, cc.HomeID, done) - err := <-done - - return t, err } -// subscribe to the websocket query -func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string, done chan error) { - var query struct { - tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"` - } - - var once sync.Once +func (t *Tibber) subscribe(done chan error) { + var ( + once sync.Once + query struct { + tibber.LiveMeasurement `graphql:"liveMeasurement(homeId: $homeId)"` + } + ) - _, err := client.Subscribe(&query, map[string]any{ - "homeId": graphql.ID(homeID), + _, err := t.client.Subscribe(&query, map[string]any{ + "homeId": graphql.ID(t.homeID), }, func(data []byte, err error) error { if err != nil { once.Do(func() { done <- err }) @@ -133,27 +139,48 @@ func (t *Tibber) subscribe(client *graphql.SubscriptionClient, homeID string, do return nil }) - if err != nil { once.Do(func() { done <- err }) } go func() { - if err := client.Run(); err != nil { + if err := t.client.Run(); err != nil { once.Do(func() { done <- err }) } }() } -// CurrentPower implements the api.Meter interface -func (t *Tibber) CurrentPower() (float64, error) { +func (t *Tibber) reconnect() error { + const timeout = time.Minute + t.mu.Lock() - defer t.mu.Unlock() + if time.Since(t.updated) <= timeout { + t.mu.Unlock() + return nil + } + t.mu.Unlock() - if time.Since(t.updated) > timeout { - return 0, api.ErrTimeout + if t.client != nil { + if err := t.client.Close(); err != nil { + t.log.DEBUG.Println("close:", err) + } } + t.newSubscriptionClient() + + done := make(chan error) + go t.subscribe(done) + + return <-done +} + +func (t *Tibber) CurrentPower() (float64, error) { + if err := t.reconnect(); err != nil { + return 0, err + } + + t.mu.Lock() + defer t.mu.Unlock() return t.live.Power - t.live.PowerProduction, nil } @@ -161,12 +188,11 @@ var _ api.PhaseCurrents = (*Tibber)(nil) // Currents implements the api.PhaseCurrents interface func (t *Tibber) Currents() (float64, float64, float64, error) { - t.mu.Lock() - defer t.mu.Unlock() - - if time.Since(t.updated) > timeout { - return 0, 0, 0, api.ErrTimeout + if err := t.reconnect(); err != nil { + return 0, 0, 0, err } + t.mu.Lock() + defer t.mu.Unlock() return t.live.CurrentL1, t.live.CurrentL2, t.live.CurrentL3, nil }