Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influx: fix writing slices and nil values #6022

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Influx: fix writing slices and nil values
  • Loading branch information
andig committed Feb 3, 2023
commit ab45449a51fbef8d368716d83fe1c5fae8e4ad45
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func runRoot(cmd *cobra.Command, args []string) {

// setup database
if err == nil && conf.Influx.URL != "" {
configureInflux(conf.Influx, site.Loadpoints(), tee.Attach())
configureInflux(conf.Influx, site, tee.Attach())
}

// setup mqtt publisher
Expand Down
22 changes: 11 additions & 11 deletions cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/evcc-io/evcc/api"
"github.com/evcc-io/evcc/cmd/shutdown"
"github.com/evcc-io/evcc/core"
"github.com/evcc-io/evcc/core/loadpoint"
"github.com/evcc-io/evcc/core/site"
"github.com/evcc-io/evcc/hems"
"github.com/evcc-io/evcc/provider/javascript"
"github.com/evcc-io/evcc/provider/mqtt"
Expand Down Expand Up @@ -123,7 +123,7 @@ func configureDatabase(conf dbConfig) error {
}

// configureInflux configures influx database
func configureInflux(conf server.InfluxConfig, loadPoints []loadpoint.API, in <-chan util.Param) {
func configureInflux(conf server.InfluxConfig, site site.API, in <-chan util.Param) {
influx := server.NewInfluxClient(
conf.URL,
conf.Token,
Expand All @@ -137,7 +137,7 @@ func configureInflux(conf server.InfluxConfig, loadPoints []loadpoint.API, in <-
dedupe := pipe.NewDeduplicator(30*time.Minute, "vehicleCapacity", "vehicleSoc", "vehicleRange", "vehicleOdometer", "chargedEnergy", "chargeRemainingEnergy")
in = dedupe.Pipe(in)

go influx.Run(loadPoints, in)
go influx.Run(site, in)
}

// setup mqtt
Expand Down Expand Up @@ -267,8 +267,8 @@ func configureTariffs(conf tariffConfig) (tariff.Tariffs, error) {

func configureSiteAndLoadpoints(conf config) (site *core.Site, err error) {
if err = cp.configure(conf); err == nil {
var loadPoints []*core.Loadpoint
loadPoints, err = configureLoadpoints(conf, cp)
var loadpoints []*core.Loadpoint
loadpoints, err = configureLoadpoints(conf, cp)

var tariffs tariff.Tariffs
if err == nil {
Expand All @@ -285,23 +285,23 @@ func configureSiteAndLoadpoints(conf config) (site *core.Site, err error) {
vehicles = append(vehicles, cp.vehicles[k])
}

site, err = configureSite(conf.Site, cp, loadPoints, vehicles, tariffs)
site, err = configureSite(conf.Site, cp, loadpoints, vehicles, tariffs)
}
}

return site, err
}

func configureSite(conf map[string]interface{}, cp *ConfigProvider, loadPoints []*core.Loadpoint, vehicles []api.Vehicle, tariffs tariff.Tariffs) (*core.Site, error) {
site, err := core.NewSiteFromConfig(log, cp, conf, loadPoints, vehicles, tariffs)
func configureSite(conf map[string]interface{}, cp *ConfigProvider, loadpoints []*core.Loadpoint, vehicles []api.Vehicle, tariffs tariff.Tariffs) (*core.Site, error) {
site, err := core.NewSiteFromConfig(log, cp, conf, loadpoints, vehicles, tariffs)
if err != nil {
return nil, fmt.Errorf("failed configuring site: %w", err)
}

return site, nil
}

func configureLoadpoints(conf config, cp *ConfigProvider) (loadPoints []*core.Loadpoint, err error) {
func configureLoadpoints(conf config, cp *ConfigProvider) (loadpoints []*core.Loadpoint, err error) {
lpInterfaces, ok := viper.AllSettings()["loadpoints"].([]interface{})
if !ok || len(lpInterfaces) == 0 {
return nil, errors.New("missing loadpoints")
Expand All @@ -319,8 +319,8 @@ func configureLoadpoints(conf config, cp *ConfigProvider) (loadPoints []*core.Lo
return nil, fmt.Errorf("failed configuring loadpoint: %w", err)
}

loadPoints = append(loadPoints, lp)
loadpoints = append(loadpoints, lp)
}

return loadPoints, nil
return loadpoints, nil
}
1 change: 0 additions & 1 deletion core/site.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ func (site *Site) Prepare(uiChan chan<- util.Param, pushChan chan<- push.Event)
}(id)

lp.Prepare(lpUIChan, lpPushChan, site.lpUpdateChan)
lp.publish("loadpoint", id+1) // 1-based loadpoint id
}
}

Expand Down
5 changes: 5 additions & 0 deletions push/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (h *Hub) apply(ev Event, tmpl *template.Template) (string, error) {
// let cache catch up, refs reverted https://github.com/evcc-io/evcc/pull/445
time.Sleep(100 * time.Millisecond)

// loadpoint id
if ev.Loadpoint != nil {
attr["loadpoint"] = *ev.Loadpoint + 1
}

// get all values from cache
for _, p := range h.cache.All() {
if p.Loadpoint == nil || ev.Loadpoint == p.Loadpoint {
Expand Down
139 changes: 78 additions & 61 deletions server/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"sync"
"time"

"github.com/evcc-io/evcc/core/loadpoint"
"github.com/benbjohnson/clock"
"github.com/evcc-io/evcc/core/site"
"github.com/evcc-io/evcc/util"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxlog "github.com/influxdata/influxdb-client-go/v2/log"
)

Expand All @@ -30,6 +31,7 @@ type InfluxConfig struct {
type Influx struct {
sync.Mutex
log *util.Logger
clock clock.Clock
client influxdb2.Client
org string
database string
Expand All @@ -52,94 +54,109 @@ func NewInfluxClient(url, token, org, user, password, database string) *Influx {

return &Influx{
log: log,
clock: clock.New(),
client: client,
org: org,
database: database,
}
}

// pointWriter is the minimal interface for influxdb2 api.Writer
type pointWriter interface {
WritePoint(point *write.Point)
}

// writePoint asynchronously writes a point to influx
func (m *Influx) writePoint(writer api.WriteAPI, key string, fields map[string]any, tags map[string]string) {
func (m *Influx) writePoint(writer pointWriter, key string, fields map[string]any, tags map[string]string) {
m.log.TRACE.Printf("write %s=%v (%v)", key, fields, tags)
writer.WritePoint(influxdb2.NewPoint(key, tags, fields, time.Now()))
writer.WritePoint(influxdb2.NewPoint(key, tags, fields, m.clock.Now()))
}

// Run Influx publisher
func (m *Influx) Run(loadPoints []loadpoint.API, in <-chan util.Param) {
writer := m.client.WriteAPI(m.org, m.database)
// writeComplexPoint asynchronously writes a point to influx
func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags map[string]string) {
fields := make(map[string]any)

// log errors
go func() {
for err := range writer.Errors() {
// log async as we're part of the logging loop
go m.log.ERROR.Println(err)
}
}()
switch val := param.Val.(type) {
case int, int64, float64:
fields["value"] = param.Val

// track active vehicle per loadpoint
vehicles := make(map[int]string)
case []float64:
if len(val) != 3 {
return
}

// add points to batch for async writing
for param := range in {
// vehicle name
if param.Loadpoint != nil && param.Key == "vehicleTitle" {
if vehicle, ok := param.Val.(string); ok {
vehicles[*param.Loadpoint] = vehicle
continue
}
// add array as phase values
for i, v := range val {
fields[fmt.Sprintf("l%d", i+1)] = v
}

fields := make(map[string]any)
case [3]float64:
// add array as phase values
for i, v := range val {
fields[fmt.Sprintf("l%d", i+1)] = v
}

tags := make(map[string]string)
if param.Loadpoint != nil {
tags["loadpoint"] = loadPoints[*param.Loadpoint].Title()
tags["vehicle"] = vehicles[*param.Loadpoint]
default:
// allow writing nil values
if param.Val == nil {
fields["value"] = nil
break
}

switch val := param.Val.(type) {
case int, int64, float64:
fields["value"] = param.Val
// slice of structs
if typ := reflect.TypeOf(param.Val); typ.Kind() == reflect.Slice && typ.Elem().Kind() == reflect.Struct {
val := reflect.ValueOf(param.Val)

case [3]float64:
// add array as phase values
for i, v := range val {
fields[fmt.Sprintf("l%d", i+1)] = v
}
// loop slice
for i := 0; i < val.Len(); i++ {
val := val.Index(i)
typ := val.Type()

// loop struct
for j := 0; j < typ.NumField(); j++ {
n := typ.Field(j).Name
v := val.Field(j).Interface()

key := param.Key + strings.ToUpper(n[:1]) + n[1:]
fields["value"] = v
tags["id"] = strconv.Itoa(i + 1)

default:
// allow writing nil values
if param.Val == nil {
break
m.writePoint(writer, key, fields, tags)
}
}
}

// slice of structs
if typ := reflect.TypeOf(param.Val); typ.Kind() == reflect.Slice && typ.Elem().Kind() == reflect.Struct {
val := reflect.ValueOf(param.Val)
return
}

m.writePoint(writer, param.Key, fields, tags)
}

// loop slice
for i := 0; i < val.Len(); i++ {
val := val.Index(i)
typ := val.Type()
// Run Influx publisher
func (m *Influx) Run(site site.API, in <-chan util.Param) {
writer := m.client.WriteAPI(m.org, m.database)

// loop struct
for j := 0; j < typ.NumField(); j++ {
n := typ.Field(j).Name
v := val.Field(j).Interface()
// log errors
go func() {
for err := range writer.Errors() {
// log async as we're part of the logging loop
go m.log.ERROR.Println(err)
}
}()

key := param.Key + strings.ToUpper(n[:1]) + n[1:]
fields["value"] = v
tags["id"] = strconv.Itoa(i + 1)
// add points to batch for async writing
for param := range in {
tags := make(map[string]string)
if param.Loadpoint != nil {
lp := site.Loadpoints()[*param.Loadpoint]

m.writePoint(writer, key, fields, tags)
}
}
tags["loadpoint"] = lp.Title()
if v := lp.GetVehicle(); v != nil {
tags["vehicle"] = v.Title()
}

continue
}

m.writePoint(writer, param.Key, fields, tags)
m.writeComplexPoint(writer, param, tags)
}

m.client.Close()
Expand Down
90 changes: 90 additions & 0 deletions server/influxdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package server

import (
"testing"

"github.com/benbjohnson/clock"
"github.com/evcc-io/evcc/util"
inf2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/stretchr/testify/assert"
)

type influxWriter struct {
t *testing.T
p []*write.Point
idx int
}

func (w *influxWriter) WritePoint(p *write.Point) {
if w.idx >= len(w.p) {
w.t.Fatal("too many points")
}

assert.Equal(w.t, w.p[w.idx], p)
w.idx++
}

func (w *influxWriter) finish() {
assert.Equal(w.t, len(w.p), w.idx, "not enough points")
}

func TestInfluxTypes(t *testing.T) {
m := &Influx{
log: util.NewLogger("foo"),
clock: clock.NewMock(),
}

{
// string value
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("foo", nil, map[string]any{"value": 1}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: 1}, nil)
w.finish()
}

{
// nil value - https://github.com/evcc-io/evcc/issues/5950
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("phasesConfigured", nil, map[string]any{"value": nil}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "phasesConfigured", Val: nil}, nil)
w.finish()
}

{
// phases array
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("foo", nil, map[string]any{
"l1": 1.0,
"l2": 2.0,
"l3": 3.0,
}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: [3]float64{1, 2, 3}}, nil)
w.finish()
}

{
// phases slice
w := &influxWriter{
t: t, p: []*write.Point{inf2.NewPoint("foo", nil, map[string]any{
"l1": 1.0,
"l2": 2.0,
"l3": 3.0,
}, m.clock.Now())},
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: []float64{1, 2, 3}}, nil)
w.finish()
}

{
// arbitrary slice
w := &influxWriter{
t: t, p: nil,
}
m.writeComplexPoint(w, util.Param{Key: "foo", Val: []float64{1, 2, 3, 4}}, nil)
w.finish()
}
}