Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plugins: allow returning errors when instantiating plugin getter/setter #10778

Merged
merged 8 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions charger/openwb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package charger

import (
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -62,27 +63,34 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
}

// timeout handler
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client, fmt.Sprintf("%s/system/%s", topic, openwb.TimestampTopic), timeout).StringGetter())

boolG := func(topic string) func() (bool, error) {
g := provider.NewMqtt(log, client, topic, 0).BoolGetter()
return to.BoolGetter(g)
h, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/system/%s", topic, openwb.TimestampTopic), timeout).StringGetter()
if err != nil {
return nil, err
}
to := provider.NewTimeoutHandler(h)

floatG := func(topic string) func() (float64, error) {
g := provider.NewMqtt(log, client, topic, 0).FloatGetter()
return to.FloatGetter(g)
mq := func(subtopic string) *provider.Mqtt {
return provider.NewMqtt(log, client, fmt.Sprintf("%s/lp/%d/%s", topic, id, subtopic), 0)
}

// check if loadpoint configured
configured := boolG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.ConfiguredTopic))
configured, err := to.BoolGetter(mq(openwb.ConfiguredTopic))
if err != nil {
return nil, err
}
if isConfigured, err := configured(); err != nil || !isConfigured {
return nil, fmt.Errorf("loadpoint %d is not configured", id)
}

// adapt plugged/charging to status
pluggedG := boolG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.PluggedTopic))
chargingG := boolG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.ChargingTopic))
pluggedG, err := to.BoolGetter(mq(openwb.PluggedTopic))
if err != nil {
return nil, err
}
chargingG, err := to.BoolGetter(mq(openwb.ChargingTopic))
if err != nil {
return nil, err
}
statusG := provider.NewOpenWBStatusProvider(pluggedG, chargingG).StringGetter

// setters
Expand All @@ -91,27 +99,45 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
// TODO remove after https://github.com/snaptec/openWB/issues/1757
currentTopic = "Lp2" + openwb.SlaveChargeCurrentTopic
}
currentS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, currentTopic),
currentS, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, currentTopic),
timeout).WithRetained().IntSetter("current")
if err != nil {
return nil, err
}

cpTopic := openwb.SlaveCPInterruptTopic
if id == 2 {
// TODO remove after https://github.com/snaptec/openWB/issues/1757
cpTopic = strings.TrimSuffix(cpTopic, "1") + "2"
}
wakeupS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, cpTopic),
wakeupS, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, cpTopic),
timeout).WithRetained().IntSetter("cp")
if err != nil {
return nil, err
}

authS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/chargepoint/%d/set/%s", topic, id, openwb.RfidTopic),
authS, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/chargepoint/%d/set/%s", topic, id, openwb.RfidTopic),
timeout).WithRetained().StringSetter("rfid")
if err != nil {
return nil, err
}

// meter getters
currentPowerG := floatG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.ChargePowerTopic))
totalEnergyG := floatG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.ChargeTotalEnergyTopic))
currentPowerG, err := to.FloatGetter(mq(openwb.ChargePowerTopic))
if err != nil {
return nil, err
}
totalEnergyG, err := to.FloatGetter(mq(openwb.ChargeTotalEnergyTopic))
if err != nil {
return nil, err
}

var currentsG []func() (float64, error)
for i := 1; i <= 3; i++ {
current := floatG(fmt.Sprintf("%s/lp/%d/%s%d", topic, id, openwb.CurrentTopic, i))
current, err := to.FloatGetter(mq(openwb.CurrentTopic + strconv.Itoa(i)))
if err != nil {
return nil, err
}
currentsG = append(currentsG, current)
}

Expand All @@ -126,10 +152,13 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
}

// heartbeat
go func() {
heartbeatS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, openwb.SlaveHeartbeatTopic),
timeout).WithRetained().IntSetter("heartbeat")
heartbeatS, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, openwb.SlaveHeartbeatTopic),
timeout).WithRetained().IntSetter("heartbeat")
if err != nil {
return nil, err
}

go func() {
for range time.Tick(openwb.HeartbeatInterval) {
if err := heartbeatS(1); err != nil {
log.ERROR.Printf("heartbeat: %v", err)
Expand All @@ -146,8 +175,11 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p
// TODO remove after https://github.com/snaptec/openWB/issues/1757
phasesTopic += "Lp2"
}
phasesS := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, phasesTopic),
phasesS, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/set/isss/%s", topic, phasesTopic),
timeout).WithRetained().IntSetter("phases")
if err != nil {
return nil, err
}

phases = func(phases int) error {
return phasesS(int64(phases))
Expand All @@ -156,7 +188,10 @@ func NewOpenWB(log *util.Logger, mqttconf mqtt.Config, id int, topic string, p1p

var soc func() (float64, error)
if dc {
soc = floatG(fmt.Sprintf("%s/lp/%d/%s", topic, id, openwb.VehicleSocTopic))
soc, err = to.FloatGetter(mq(openwb.VehicleSocTopic))
if err != nil {
return nil, err
}
}

return decorateOpenWB(c, phases, soc), nil
Expand Down
85 changes: 51 additions & 34 deletions charger/warp2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ type Warp2 struct {
meterDetailsG func() (string, error)
chargeG func() (string, error)
userconfigG func() (string, error)
emStateG func() (string, error)
maxcurrentS func(int64) error
// emConfigG func() (string, error)
emStateG func() (string, error)
phasesS func(int64) error
current int64
phasesS func(int64) error
current int64
}

func init() {
Expand Down Expand Up @@ -103,33 +102,60 @@ func NewWarp2(mqttconf mqtt.Config, topic, emTopic string, timeout time.Duration
}

// timeout handler
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/low_level_state", topic), timeout,
).StringGetter())
h, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/evse/low_level_state", topic), timeout).StringGetter()
if err != nil {
return nil, err
}
to := provider.NewTimeoutHandler(h)

stringG := func(topic string) func() (string, error) {
g := provider.NewMqtt(log, client, topic, 0).StringGetter()
return to.StringGetter(g)
mq := func(s string, args ...any) *provider.Mqtt {
return provider.NewMqtt(log, client, fmt.Sprintf(s, args...), 0)
}

wb.maxcurrentG = stringG(fmt.Sprintf("%s/evse/external_current", topic))
wb.statusG = stringG(fmt.Sprintf("%s/evse/state", topic))
wb.meterG = stringG(fmt.Sprintf("%s/meter/values", topic))
wb.meterDetailsG = stringG(fmt.Sprintf("%s/meter/all_values", topic))
wb.chargeG = stringG(fmt.Sprintf("%s/charge_tracker/current_charge", topic))
wb.userconfigG = stringG(fmt.Sprintf("%s/users/config", topic))
wb.maxcurrentG, err = to.StringGetter(mq("%s/evse/external_current", topic))
if err != nil {
return nil, err
}
wb.statusG, err = to.StringGetter(mq("%s/evse/state", topic))
if err != nil {
return nil, err
}
wb.meterG, err = to.StringGetter(mq("%s/meter/values", topic))
if err != nil {
return nil, err
}
wb.meterDetailsG, err = to.StringGetter(mq("%s/meter/all_values", topic))
if err != nil {
return nil, err
}
wb.chargeG, err = to.StringGetter(mq("%s/charge_tracker/current_charge", topic))
if err != nil {
return nil, err
}
wb.userconfigG, err = to.StringGetter(mq("%s/users/config", topic))
if err != nil {
return nil, err
}

wb.maxcurrentS = provider.NewMqtt(log, client,
wb.maxcurrentS, err = provider.NewMqtt(log, client,
fmt.Sprintf("%s/evse/external_current_update", topic), 0).
WithPayload(`{ "current": ${maxcurrent} }`).
IntSetter("maxcurrent")
if err != nil {
return nil, err
}

// wb.emConfigG = stringG(fmt.Sprintf("%s/energy_manager/config", emTopic))
wb.emStateG = stringG(fmt.Sprintf("%s/energy_manager/state", emTopic))
wb.phasesS = provider.NewMqtt(log, client,
wb.emStateG, err = to.StringGetter(mq("%s/energy_manager/state", emTopic))
if err != nil {
return nil, err
}
wb.phasesS, err = provider.NewMqtt(log, client,
fmt.Sprintf("%s/energy_manager/external_control_update", emTopic), 0).
WithPayload(`{ "phases_wanted": ${phases} }`).
IntSetter("phases")
if err != nil {
return nil, err
}

return wb, nil
}
Expand All @@ -141,9 +167,11 @@ func (wb *Warp2) hasFeature(root, feature string) bool {

topic := fmt.Sprintf("%s/info/features", root)

if data, err := provider.NewMqtt(wb.log, wb.client, topic, 0).StringGetter()(); err == nil {
if err := json.Unmarshal([]byte(data), &wb.features); err == nil {
return slices.Contains(wb.features, feature)
if dataG, err := provider.NewMqtt(wb.log, wb.client, topic, 0).StringGetter(); err == nil {
if data, err := dataG(); err == nil {
if err := json.Unmarshal([]byte(data), &wb.features); err == nil {
return slices.Contains(wb.features, feature)
}
}
}

Expand Down Expand Up @@ -289,17 +317,6 @@ func (wb *Warp2) identify() (string, error) {
return res.AuthorizationInfo.TagId, err
}

// func (wb *Warp2) emConfig() (warp.EmConfig, error) {
// var res warp.EmConfig

// s, err := wb.emConfigG()
// if err == nil {
// err = json.Unmarshal([]byte(s), &res)
// }

// return res, err
// }

func (wb *Warp2) emState() (warp.EmState, error) {
var res warp.EmState

Expand Down
53 changes: 36 additions & 17 deletions meter/openwb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,14 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {
}

// timeout handler
to := provider.NewTimeoutHandler(provider.NewMqtt(log, client,
fmt.Sprintf("%s/system/%s", cc.Topic, openwb.TimestampTopic), cc.Timeout,
).StringGetter())

boolG := func(topic string) func() (bool, error) {
g := provider.NewMqtt(log, client, topic, 0).BoolGetter()
return to.BoolGetter(g)
h, err := provider.NewMqtt(log, client, fmt.Sprintf("%s/system/%s", cc.Topic, openwb.TimestampTopic), cc.Timeout).StringGetter()
if err != nil {
return nil, err
}
to := provider.NewTimeoutHandler(h)

floatG := func(topic string) func() (float64, error) {
g := provider.NewMqtt(log, client, topic, 0).FloatGetter()
return to.FloatGetter(g)
mq := func(s string, args ...any) *provider.Mqtt {
return provider.NewMqtt(log, client, fmt.Sprintf(s, args...), 0)
}

var power func() (float64, error)
Expand All @@ -63,18 +59,27 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {

switch strings.ToLower(cc.Usage) {
case "grid":
power = floatG(fmt.Sprintf("%s/evu/%s", cc.Topic, openwb.PowerTopic))
power, err = to.FloatGetter(mq("%s/evu/%s", cc.Topic, openwb.PowerTopic))
if err != nil {
return nil, err
}

var curr []func() (float64, error)
for i := 1; i <= 3; i++ {
current := floatG(fmt.Sprintf("%s/evu/%s%d", cc.Topic, openwb.CurrentTopic, i))
current, err := to.FloatGetter(mq("%s/evu/%s%d", cc.Topic, openwb.CurrentTopic, i))
if err != nil {
return nil, err
}
curr = append(curr, current)
}

currents = collectPhaseProviders(curr)

case "pv":
configuredG := boolG(fmt.Sprintf("%s/pv/1/%s", cc.Topic, openwb.PvConfigured)) // first pv
configuredG, err := to.BoolGetter(mq("%s/pv/1/%s", cc.Topic, openwb.PvConfigured)) // first pv
if err != nil {
return nil, err
}
configured, err := configuredG()
if err != nil {
return nil, err
Expand All @@ -84,14 +89,20 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {
return nil, errors.New("pv not available")
}

g := floatG(fmt.Sprintf("%s/pv/%s", cc.Topic, openwb.PowerTopic))
g, err := to.FloatGetter(mq("%s/pv/%s", cc.Topic, openwb.PowerTopic))
if err != nil {
return nil, err
}
power = func() (float64, error) {
f, err := g()
return -f, err
}

case "battery":
configuredG := boolG(fmt.Sprintf("%s/housebattery/%s", cc.Topic, openwb.BatteryConfigured))
configuredG, err := to.BoolGetter(mq("%s/housebattery/%s", cc.Topic, openwb.BatteryConfigured))
if err != nil {
return nil, err
}
configured, err := configuredG()
if err != nil {
return nil, err
Expand All @@ -101,12 +112,20 @@ func NewOpenWBFromConfig(other map[string]interface{}) (api.Meter, error) {
return nil, errors.New("battery not available")
}

inner := floatG(fmt.Sprintf("%s/housebattery/%s", cc.Topic, openwb.PowerTopic))
inner, err := to.FloatGetter(mq("%s/housebattery/%s", cc.Topic, openwb.PowerTopic))
if err != nil {
return nil, err
}
power = func() (float64, error) {
f, err := inner()
return -f, err
}
soc = floatG(fmt.Sprintf("%s/housebattery/%s", cc.Topic, openwb.SocTopic))

soc, err = to.FloatGetter(mq("%s/housebattery/%s", cc.Topic, openwb.SocTopic))
if err != nil {
return nil, err
}

capacity = cc.capacity.Decorator()

default:
Expand Down
Loading