Skip to content

Commit

Permalink
OpenWB: ensure sending retained messages (#2446)
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Feb 3, 2022
1 parent fb75ec8 commit 4c50928
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 36 deletions.
28 changes: 11 additions & 17 deletions charger/openwb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,15 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
}

// timeout handler
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client,
fmt.Sprintf("%s/system/%s", topic, openwb.TimestampTopic), 1, timeout,
).StringGetter())
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client, fmt.Sprintf("%s/system/%s", topic, openwb.TimestampTopic), timeout).StringGetter())

boolG := func(topic string) func() (bool, error) {
g := provider.NewMqtt(log, client, topic, 1, 0).BoolGetter()
g := provider.NewMqtt(log, client, topic, 0).BoolGetter()
return to.BoolGetter(g)
}

floatG := func(topic string) func() (float64, error) {
g := provider.NewMqtt(log, client, topic, 1, 0).FloatGetter()
g := provider.NewMqtt(log, client, topic, 0).FloatGetter()
return to.FloatGetter(g)
}

Expand All @@ -94,13 +92,11 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
// TODO remove after https://github.com/snaptec/openWB/issues/1757
currentTopic = "Lp2" + openwb.SlaveChargeCurrentTopic
}
currentS := provider.NewMqtt(log, client,
fmt.Sprintf("%s/set/isss/%s", topic, currentTopic),
1, timeout).IntSetter("current")
currentS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, currentTopic),
timeout).WithRetained().IntSetter("current")

authS := provider.NewMqtt(log, client,
fmt.Sprintf("%s/set/chargepoint/%d/get/%s", topic, id, openwb.RfidTopic),
1, timeout).StringSetter("rfid")
authS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/chargepoint/%d/get/%s", topic, id, openwb.RfidTopic),
timeout).WithRetained().StringSetter("rfid")

// meter getters
currentPowerG := floatG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.ChargePowerTopic))
Expand All @@ -124,9 +120,8 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p

// heartbeat
go func() {
heartbeatS := provider.NewMqtt(log, client,
fmt.Sprintf("%s/set/isss/%s", topic, openwb.SlaveHeartbeatTopic),
1, timeout).IntSetter("heartbeat")
heartbeatS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, openwb.SlaveHeartbeatTopic),
timeout).WithRetained().IntSetter("heartbeat")

for range time.NewTicker(openwb.HeartbeatInterval).C {
if err := heartbeatS(1); err != nil {
Expand All @@ -144,9 +139,8 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
// TODO remove after https://github.com/snaptec/openWB/issues/1757
phasesTopic += "Lp2"
}
phasesS := provider.NewMqtt(log, client,
fmt.Sprintf("%s/set/isss/%s", topic, phasesTopic),
1, timeout).IntSetter("phases")
phasesS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, phasesTopic),
timeout).WithRetained().IntSetter("phases")

phases = func(phases int) error {
return phasesS(int64(phases))
Expand Down
20 changes: 10 additions & 10 deletions charger/warp.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ func NewWarp(mqttconf mqtt.Config, topic string, timeout time.Duration) (*Warp,

// timeout handler
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/state", topic), 1, timeout,
fmt.Sprintf("%s/evse/state", topic), timeout,
).StringGetter())

stringG := func(topic string) func() (string, error) {
g := provider.NewMqtt(log, client, topic, 1, 0).StringGetter()
g := provider.NewMqtt(log, client, topic, 0).StringGetter()
return to.StringGetter(g)
}

Expand All @@ -107,22 +107,22 @@ func NewWarp(mqttconf mqtt.Config, topic string, timeout time.Duration) (*Warp,
wb.meterDetailsG = stringG(fmt.Sprintf("%s/meter/detailed_values", topic))

wb.enableS = provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/auto_start_charging_update", topic), 1, 0).
fmt.Sprintf("%s/evse/auto_start_charging_update", topic), 0).
WithPayload(`{ "auto_start_charging": ${enable} }`).
BoolSetter("enable")

wb.maxcurrentS = provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/current_limit", topic), 1, 0).
fmt.Sprintf("%s/evse/current_limit", topic), 0).
WithPayload(`{ "current": ${maxcurrent} }`).
IntSetter("maxcurrent")

return wb, nil
}

func (wb *Warp) hasMeter() bool {
if state, err := provider.NewMqtt(wb.log, wb.client,
fmt.Sprintf("%s/meter/state", wb.root), 1, 0,
).StringGetter()(); err == nil {
topic := fmt.Sprintf("%s/meter/state", wb.root)

if state, err := provider.NewMqtt(wb.log, wb.client, topic, 0).StringGetter()(); err == nil {
var res warp.MeterState
if err := json.Unmarshal([]byte(state), &res); err == nil {
return res.State == 2 || len(res.PhasesConnected) > 0
Expand All @@ -133,9 +133,9 @@ func (wb *Warp) hasMeter() bool {
}

func (wb *Warp) hasCurrents() bool {
if state, err := provider.NewMqtt(wb.log, wb.client,
fmt.Sprintf("%s/meter/detailed_values", wb.root), 1, 0,
).StringGetter()(); err == nil {
topic := fmt.Sprintf("%s/meter/detailed_values", wb.root)

if state, err := provider.NewMqtt(wb.log, wb.client, topic, 0).StringGetter()(); err == nil {
var res []float64
if err := json.Unmarshal([]byte(state), &res); err == nil {
return len(res) > 5
Expand Down
6 changes: 3 additions & 3 deletions meter/openwb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {

// timeout handler
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client,
fmt.Sprintf("%s/system/%s", cc.Topic, openwb.TimestampTopic), 1, cc.Timeout,
fmt.Sprintf("%s/system/%s", cc.Topic, openwb.TimestampTopic), cc.Timeout,
).StringGetter())

boolG := func(topic string) func() (bool, error) {
g := provider.NewMqtt(log, client, topic, 1, 0).BoolGetter()
g := provider.NewMqtt(log, client, topic, 0).BoolGetter()
return to.BoolGetter(g)
}

floatG := func(topic string) func() (float64, error) {
g := provider.NewMqtt(log, client, topic, 1, 0).FloatGetter()
g := provider.NewMqtt(log, client, topic, 0).FloatGetter()
return to.FloatGetter(g)
}

Expand Down
29 changes: 23 additions & 6 deletions provider/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Mqtt struct {
log *util.Logger
client *mqtt.Client
topic string
retained bool
payload string
scale float64
timeout time.Duration
Expand All @@ -28,6 +29,7 @@ func NewMqttFromConfig(other map[string]interface{}) (IntProvider, error) {
cc := struct {
mqtt.Config `mapstructure:",squash"`
Topic, Payload string // Payload only applies to setters
Retained bool
Scale float64
Timeout time.Duration
pipeline.Settings `mapstructure:",squash"`
Expand All @@ -46,7 +48,10 @@ func NewMqttFromConfig(other map[string]interface{}) (IntProvider, error) {
return nil, err
}

m := NewMqtt(log, client, cc.Topic, cc.Scale, cc.Timeout).WithPayload(cc.Payload)
m := NewMqtt(log, client, cc.Topic, cc.Timeout).WithScale(cc.Scale).WithPayload(cc.Payload)
if cc.Retained {
m = m.WithRetained()
}

pipe, err := pipeline.New(cc.Settings)
if err == nil {
Expand All @@ -57,12 +62,12 @@ func NewMqttFromConfig(other map[string]interface{}) (IntProvider, error) {
}

// NewMqtt creates mqtt provider for given topic
func NewMqtt(log *util.Logger, client *mqtt.Client, topic string, scale float64, timeout time.Duration) *Mqtt {
func NewMqtt(log *util.Logger, client *mqtt.Client, topic string, timeout time.Duration) *Mqtt {
m := &Mqtt{
log: log,
client: client,
topic: topic,
scale: scale,
scale: 1,
timeout: timeout,
}

Expand All @@ -75,6 +80,18 @@ func (m *Mqtt) WithPayload(payload string) *Mqtt {
return m
}

// WithRetained adds retained flag for setters
func (m *Mqtt) WithRetained() *Mqtt {
m.retained = true
return m
}

// WithScale sets scaler for getters
func (m *Mqtt) WithScale(scale float64) *Mqtt {
m.scale = scale
return m
}

// WithPipeline adds a processing pipeline
func (p *Mqtt) WithPipeline(pipeline *pipeline.Pipeline) *Mqtt {
p.pipeline = pipeline
Expand Down Expand Up @@ -138,7 +155,7 @@ func (m *Mqtt) IntSetter(param string) func(int64) error {
return err
}

return m.client.Publish(m.topic, false, payload)
return m.client.Publish(m.topic, m.retained, payload)
}
}

Expand All @@ -152,7 +169,7 @@ func (m *Mqtt) BoolSetter(param string) func(bool) error {
return err
}

return m.client.Publish(m.topic, false, payload)
return m.client.Publish(m.topic, m.retained, payload)
}
}

Expand All @@ -166,6 +183,6 @@ func (m *Mqtt) StringSetter(param string) func(string) error {
return err
}

return m.client.Publish(m.topic, false, payload)
return m.client.Publish(m.topic, m.retained, payload)
}
}

0 comments on commit 4c50928

Please sign in to comment.