Skip to content

Commit

Permalink
Mqtt: add jq parsing (evcc-io#943)
Browse files Browse the repository at this point in the history
Rate limit · GitHub

Access has been restricted

You have triggered a rate limit.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

andig authored May 1, 2021
1 parent 20d112f commit eb0927a
Showing 10 changed files with 112 additions and 34 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
[![Donate](https://img.shields.io/badge/Donate-PayPal-green.svg)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=48YVXXA7BDNC2)

EVCC is an extensible EV Charge Controller with PV integration implemented in [Go](2). Featured in [PV magazine](https://www.pv-magazine.de/2021/01/15/selbst-ist-der-groeoenlandhof-wallbox-ladesteuerung-selbst-gebaut/).

## Features <!-- omit in toc -->

- simple and clean user interface
@@ -231,7 +231,7 @@ EVCC uses positive (+) sign for incoming energy (grid consumption, PV inverter p
Available meter implementations are:
- `modbus`: ModBus meters as supported by [MBMD](https://github.com/volkszaehler/mbmd#supported-devices). Configuration is similar to the [ModBus plugin](#modbus-read-only) where `power` and `energy` specify the MBMD measurement value to use. Additionally, `soc` can specify an MBMD measurement value for home battery soc. Typical values are `power: Power`, `energy: Sum` and `soc: ChargeState` where only `power` applied per default.
- `modbus`: ModBus meters as supported by [MBMD](https://github.com/volkszaehler/mbmd#supported-devices). Configuration is similar to the [ModBus plugin](#modbus-readwrite) where `power` and `energy` specify the MBMD measurement value to use. Additionally, `soc` can specify an MBMD measurement value for home battery soc. Typical values are `power: Power`, `energy: Sum` and `soc: ChargeState` where only `power` applied per default.
- `openwb`: OpenWB meters. Use `usage` to choose meter type: `grid`/`pv`/`battery`.
- `sma`: SMA Home Manager 2.0 and SMA Energy Meter. Power reading is configured out of the box but can be customized if necessary. To obtain specific energy readings define the desired Obis code (Import Energy: "1:1.8.0", Export Energy: "1:2.8.0").
- `tesla`: Tesla PowerWall meter. Use `usage` to choose meter type: `grid`/`pv`/`battery`.
@@ -372,7 +372,7 @@ To write a register use `type: writesingle` which writes a single 16bit register
### MQTT (read/write)
The `mqtt` plugin allows to read values from MQTT topics. This is particularly useful for meters, e.g. when meter data is already available on MQTT. See [MBMD](5) for an example how to get Modbus meter data into MQTT.
The `mqtt` plugin allows to read values from MQTT topics. This is particularly useful for meters, e.g. when meter data is already available on MQTT. See [MBMD](5) for an example how to get Modbus meter data into MQTT. Includes the ability to read and parse JSON using jq-like queries (see [HTTP plugin](#http-readwrite)).
Sample configuration:
2 changes: 2 additions & 0 deletions internal/charger/config_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import (
)

func TestChargers(t *testing.T) {
test.SkipCI(t)

acceptable := []string{
"invalid plugin type: ...",
"missing mqtt broker configuration",
10 changes: 5 additions & 5 deletions internal/charger/openwb.go
Original file line number Diff line number Diff line change
@@ -54,12 +54,12 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, tim

// timeout handler
timer := provider.NewMqtt(log, client,
fmt.Sprintf("%s/system/%s", topic, openwb.TimestampTopic), "", 1, timeout,
fmt.Sprintf("%s/system/%s", topic, openwb.TimestampTopic), 1, timeout,
).IntGetter()

// getters
boolG := func(topic string) func() (bool, error) {
g := provider.NewMqtt(log, client, topic, "", 1, 0).BoolGetter()
g := provider.NewMqtt(log, client, topic, 1, 0).BoolGetter()
return func() (val bool, err error) {
if val, err = g(); err == nil {
_, err = timer()
@@ -69,7 +69,7 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, tim
}

floatG := func(topic string) func() (float64, error) {
g := provider.NewMqtt(log, client, topic, "", 1, 0).FloatGetter()
g := provider.NewMqtt(log, client, topic, 1, 0).FloatGetter()
return func() (val float64, err error) {
if val, err = g(); err == nil {
_, err = timer()
@@ -95,10 +95,10 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, tim
// setters
enable := provider.NewMqtt(log, client,
fmt.Sprintf("%s/set/lp%d/%s", topic, id, openwb.EnabledTopic),
"", 1, timeout).BoolSetter("enable")
1, timeout).BoolSetter("enable")
maxcurrent := provider.NewMqtt(log, client,
fmt.Sprintf("%s/set/lp%d/%s", topic, id, openwb.MaxCurrentTopic),
"", 1, timeout).IntSetter("maxcurrent")
1, timeout).IntSetter("maxcurrent")

// meter getters
currentPowerG := floatG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.ChargePowerTopic))
18 changes: 10 additions & 8 deletions internal/charger/warp.go
Original file line number Diff line number Diff line change
@@ -83,11 +83,11 @@ func NewWarp(mqttconf mqtt.Config, topic string, timeout time.Duration) (*Warp,

// timeout handler
timer := provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/state", topic), "", 1, timeout,
fmt.Sprintf("%s/evse/state", topic), 1, timeout,
).StringGetter()

stringG := func(topic string) func() (string, error) {
g := provider.NewMqtt(log, client, topic, "", 1, 0).StringGetter()
g := provider.NewMqtt(log, client, topic, 1, 0).StringGetter()
return func() (val string, err error) {
if val, err = g(); err == nil {
_, err = timer()
@@ -101,14 +101,14 @@ func NewWarp(mqttconf mqtt.Config, topic string, timeout time.Duration) (*Warp,
m.meterG = stringG(fmt.Sprintf("%s/meter/state", topic))

m.enableS = provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/auto_start_charging_update", topic),
`{ "auto_start_charging": ${enable} }`, 1, 0,
).BoolSetter("enable")
fmt.Sprintf("%s/evse/auto_start_charging_update", topic), 1, 0).
WithPayload(`{ "auto_start_charging": ${enable} }`).
BoolSetter("enable")

m.maxcurrentS = provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/current_limit", topic),
`{ "current": ${maxcurrent} }`, 1, 0,
).IntSetter("maxcurrent")
fmt.Sprintf("%s/evse/current_limit", topic), 1, 0).
WithPayload(`{ "current": ${maxcurrent} }`).
IntSetter("maxcurrent")

return m, nil
}
@@ -160,6 +160,8 @@ func (m *Warp) status() (warpStatus, error) {
return res, err
}

// autostart reads the enabled state from charger
// use function instead of jq to honor evse/state updates
func (m *Warp) autostart() (bool, error) {
var res struct {
AutoStartCharging bool `json:"auto_start_charging"`
2 changes: 2 additions & 0 deletions internal/meter/config_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import (
)

func TestMeters(t *testing.T) {
test.SkipCI(t)

acceptable := []string{
"invalid plugin type: ...",
"missing mqtt broker configuration",
6 changes: 3 additions & 3 deletions internal/meter/openwb.go
Original file line number Diff line number Diff line change
@@ -42,12 +42,12 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {

// timeout handler
timer := provider.NewMqtt(log, client,
fmt.Sprintf("%s/system/%s", cc.Topic, openwb.TimestampTopic), "", 1, cc.Timeout,
fmt.Sprintf("%s/system/%s", cc.Topic, openwb.TimestampTopic), 1, cc.Timeout,
).IntGetter()

// getters
boolG := func(topic string) func() (bool, error) {
g := provider.NewMqtt(log, client, topic, "", 1, 0).BoolGetter()
g := provider.NewMqtt(log, client, topic, 1, 0).BoolGetter()
return func() (val bool, err error) {
if val, err = g(); err == nil {
_, err = timer()
@@ -57,7 +57,7 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {
}

floatG := func(topic string) func() (float64, error) {
g := provider.NewMqtt(log, client, topic, "", 1, 0).FloatGetter()
g := provider.NewMqtt(log, client, topic, 1, 0).FloatGetter()
return func() (val float64, err error) {
if val, err = g(); err == nil {
_, err = timer()
2 changes: 2 additions & 0 deletions internal/vehicle/config_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import (
)

func TestVehicles(t *testing.T) {
test.SkipCI(t)

acceptable := []string{
"invalid plugin type: ...",
"missing mqtt broker configuration",
2 changes: 1 addition & 1 deletion provider/http.go
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ func NewHTTP(log *util.Logger, method, uri string, headers map[string]string, bo
if jq != "" {
op, err := gojq.Parse(jq)
if err != nil {
return nil, fmt.Errorf("invalid jq query '%s': %w", p.jq, err)
return nil, fmt.Errorf("invalid jq query '%s': %w", jq, err)
}

p.jq = op
86 changes: 72 additions & 14 deletions provider/mqtt.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ import (

"github.com/andig/evcc/provider/mqtt"
"github.com/andig/evcc/util"
"github.com/andig/evcc/util/jq"
"github.com/itchyny/gojq"
)

// Mqtt provider
@@ -18,6 +20,7 @@ type Mqtt struct {
payload string
scale float64
timeout time.Duration
jq *gojq.Query
}

func init() {
@@ -31,6 +34,7 @@ func NewMqttFromConfig(other map[string]interface{}) (IntProvider, error) {
Topic, Payload string // Payload only applies to setters
Scale float64
Timeout time.Duration
Jq string
}{
Scale: 1,
}
@@ -46,70 +50,109 @@ func NewMqttFromConfig(other map[string]interface{}) (IntProvider, error) {
return nil, err
}

m := NewMqtt(log, client, cc.Topic, cc.Payload, cc.Scale, cc.Timeout)
m := NewMqtt(log, client, cc.Topic, cc.Scale, cc.Timeout)

if cc.Payload != "" {
m = m.WithPayload(cc.Payload)
}
if cc.Jq != "" {
m, err = m.WithJq(cc.Jq)
}

return m, err
}

// NewMqtt creates mqtt provider for given topic
func NewMqtt(log *util.Logger, client *mqtt.Client, topic string, payload string, scale float64, timeout time.Duration) *Mqtt {
func NewMqtt(log *util.Logger, client *mqtt.Client, topic string, scale float64, timeout time.Duration) *Mqtt {
m := &Mqtt{
log: log,
client: client,
topic: topic,
payload: payload,
scale: scale,
timeout: timeout,
}

return m
}

// WithPayload adds payload for setters
func (m *Mqtt) WithPayload(payload string) *Mqtt {
m.payload = payload
return m
}

// WithJq adds a jq query applied to the mqtt listener payload
func (m *Mqtt) WithJq(jq string) (*Mqtt, error) {
op, err := gojq.Parse(jq)
if err != nil {
return m, fmt.Errorf("invalid jq query '%s': %w", jq, err)
}

m.jq = op

return m, nil
}

var _ FloatProvider = (*Mqtt)(nil)

// FloatGetter creates handler for float64 from MQTT topic that returns cached value
func (m *Mqtt) FloatGetter() func() (float64, error) {
h := &msgHandler{
topic: m.topic,
scale: m.scale,
mux: util.NewWaiter(m.timeout, func() { m.log.TRACE.Printf("%s wait for initial value", m.topic) }),
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
return h.floatGetter
}

var _ IntProvider = (*Mqtt)(nil)

// IntGetter creates handler for int64 from MQTT topic that returns cached value
func (m *Mqtt) IntGetter() func() (int64, error) {
h := &msgHandler{
topic: m.topic,
scale: float64(m.scale),
mux: util.NewWaiter(m.timeout, func() { m.log.TRACE.Printf("%s wait for initial value", m.topic) }),
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
return h.intGetter
}

var _ StringProvider = (*Mqtt)(nil)

// StringGetter creates handler for string from MQTT topic that returns cached value
func (m *Mqtt) StringGetter() func() (string, error) {
h := &msgHandler{
topic: m.topic,
mux: util.NewWaiter(m.timeout, func() { m.log.TRACE.Printf("%s wait for initial value", m.topic) }),
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
return h.stringGetter
}

var _ BoolProvider = (*Mqtt)(nil)

// BoolGetter creates handler for string from MQTT topic that returns cached value
func (m *Mqtt) BoolGetter() func() (bool, error) {
h := &msgHandler{
topic: m.topic,
mux: util.NewWaiter(m.timeout, func() { m.log.TRACE.Printf("%s wait for initial value", m.topic) }),
jq: m.jq,
}

m.client.Listen(m.topic, h.receive)
return h.boolGetter
}

var _ SetIntProvider = (*Mqtt)(nil)

// IntSetter publishes topic with parameter replaced by int value
func (m *Mqtt) IntSetter(param string) func(int64) error {
return func(v int64) error {
@@ -122,6 +165,8 @@ func (m *Mqtt) IntSetter(param string) func(int64) error {
}
}

var _ SetBoolProvider = (*Mqtt)(nil)

// BoolSetter invokes script with parameter replaced by bool value
func (m *Mqtt) BoolSetter(param string) func(bool) error {
return func(v bool) error {
@@ -134,23 +179,26 @@ func (m *Mqtt) BoolSetter(param string) func(bool) error {
}
}

// StringSetter invokes script with parameter replaced by string value
func (m *Mqtt) StringSetter(param string) func(string) error {
return func(v string) error {
payload, err := setFormattedValue(m.payload, param, v)
if err != nil {
return err
}
// var _ SetStringProvider = (*Mqtt)(nil)

return m.client.Publish(m.topic, false, payload)
}
}
// // StringSetter invokes script with parameter replaced by string value
// func (m *Mqtt) StringSetter(param string) func(string) error {
// return func(v string) error {
// payload, err := setFormattedValue(m.payload, param, v)
// if err != nil {
// return err
// }

// return m.client.Publish(m.topic, false, payload)
// }
// }

type msgHandler struct {
mux *util.Waiter
scale float64
topic string
payload string
jq *gojq.Query
}

func (h *msgHandler) receive(payload string) {
@@ -169,7 +217,17 @@ func (h *msgHandler) hasValue() (string, error) {
return "", fmt.Errorf("%s outdated: %v", h.topic, elapsed.Truncate(time.Second))
}

return h.payload, nil
var err error
payload := h.payload

if h.jq != nil {
var val interface{}
if val, err = jq.Query(h.jq, []byte(payload)); err == nil {
payload = fmt.Sprintf("%v", val)
}
}

return payload, err
}

func (h *msgHandler) floatGetter() (float64, error) {
12 changes: 12 additions & 0 deletions util/test/ci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package test

import (
"os"
"testing"
)

func SkipCI(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping testing in CI environment")
}
}

0 comments on commit eb0927a

Please sign in to comment.