forked from evcc-io/evcc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
166 lines (133 loc) Β· 3.2 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package keba
import (
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"github.com/evcc-io/evcc/util"
)
const (
udpBufferSize = 1024
// Port is the KEBA UDP port
Port = 7090
// OK is the KEBA confirmation message
OK = "TCH-OK :done"
// Any subscriber receives all messages
Any = "<any>"
)
// instance is the KEBA listener instance
// This is needed since KEBAs ignore the sender port and always UDP back to port 7090
var (
mu sync.Mutex
instance *Listener
)
// UDPMsg transports the KEBA response. Report is any of Report1,2,3
type UDPMsg struct {
Addr string
Message []byte
Report *Report
}
// Listener singleton listens for KEBA UDP messages
type Listener struct {
mux sync.Mutex
log *util.Logger
conn *net.UDPConn
clients map[string]chan<- UDPMsg
cache map[string]string
}
func Instance(log *util.Logger) (*Listener, error) {
mu.Lock()
defer mu.Unlock()
var err error
if instance == nil {
instance, err = New(log)
}
return instance, err
}
// New creates a UDP listener that clients can subscribe to
func New(log *util.Logger) (*Listener, error) {
laddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", Port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", laddr)
if err != nil {
return nil, err
}
l := &Listener{
log: log,
conn: conn,
clients: make(map[string]chan<- UDPMsg),
cache: make(map[string]string),
}
go l.listen()
return l, nil
}
// Subscribe adds a client address or serial and message channel to the list of subscribers
func (l *Listener) Subscribe(addr string, c chan<- UDPMsg) {
l.mux.Lock()
defer l.mux.Unlock()
l.clients[addr] = c
}
func (l *Listener) listen() {
b := make([]byte, udpBufferSize)
for {
read, addr, err := l.conn.ReadFrom(b)
if err != nil {
l.log.TRACE.Printf("listener: %v", err)
continue
}
body := strings.TrimSpace(string(b[:read]))
l.log.TRACE.Printf("recv from %s %v", addr.String(), body)
msg := UDPMsg{
Addr: addr.String(),
Message: []byte(body),
}
if body != OK {
var report Report
if err := json.Unmarshal([]byte(body), &report); err != nil {
// ignore error during detection when sending report request to localhost
if body != "report 1" {
l.log.WARN.Printf("recv: invalid message: %v", err)
}
continue
}
msg.Report = &report
}
l.send(msg)
}
}
// addrMatches checks if either message sender or serial matches given addr
func (l *Listener) addrMatches(addr string, msg UDPMsg) bool {
switch {
case addr == Any:
return true
case addr == msg.Addr:
return true
// simple response like TCH :OK where cached serial for sender address matches
case l.cache[addr] == msg.Addr:
return true
// report response with matching serial
case msg.Report != nil && addr == msg.Report.Serial:
// cache address for serial to make simple TCH :OK messages routable using serial
l.cache[msg.Report.Serial] = msg.Addr
return true
default:
return false
}
}
func (l *Listener) send(msg UDPMsg) {
l.mux.Lock()
defer l.mux.Unlock()
for addr, client := range l.clients {
if l.addrMatches(addr, msg) {
select {
case client <- msg:
default:
l.log.TRACE.Println("recv: listener blocked")
}
break
}
}
}