-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
100 lines (80 loc) · 2.42 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"encoding/json"
"errors"
"os"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
)
const (
TIMEOUT time.Duration = time.Second * 10
SEND_TOPIC = APPNAME + "/send"
)
type TelegramMessage struct {
Message string `json:"message"`
ImageUrl string `json:"image-url"`
}
var mqttClient MQTT.Client
func sendToMtt(topic string, message string) {
mqttClient.Publish(topic, byte(config.Mqtt.Qos), config.Mqtt.Retain, message)
}
func sendToMttRetain(topic string, message string) {
mqttClient.Publish(topic, byte(config.Mqtt.Qos), true, message)
}
func receive(client MQTT.Client, msg MQTT.Message) {
message := string(msg.Payload()[:])
var telegramMessage TelegramMessage
err := json.Unmarshal([]byte(message), &telegramMessage)
if err != nil {
log.Error().Msgf("JSON Error: %s", err.Error())
return
}
err = validateMessage(telegramMessage)
if err != nil {
log.Warn().Msgf("MQTT message error: %s", err.Error())
return
}
log.Debug().Msgf("Message: %s", telegramMessage.Message)
sendToTelegram(telegramMessage.Message, "")
}
func GetClientId() string {
hostname, _ := os.Hostname()
return APPNAME + "_" + hostname
}
func validateMessage(msg TelegramMessage) error {
if msg.Message == "" {
return errors.New("message is mandatory")
}
return nil
}
func startMqttClient() {
opts := MQTT.NewClientOptions().AddBroker(config.Mqtt.Url)
if config.Mqtt.Username != "" && config.Mqtt.Password != "" {
opts.SetUsername(config.Mqtt.Username)
opts.SetPassword(config.Mqtt.Password)
}
opts.SetClientID(GetClientId())
opts.SetCleanSession(true)
opts.SetBinaryWill(APPNAME+"/status", []byte("Offline"), 0, true)
opts.SetAutoReconnect(true)
opts.SetConnectionLostHandler(connLostHandler)
opts.SetOnConnectHandler(onConnectHandler)
mqttClient = MQTT.NewClient(opts)
token := mqttClient.Connect()
if token.WaitTimeout(TIMEOUT) && token.Error() != nil {
log.Fatal().Err(token.Error()).Msg("MQTT Connection Error")
}
token = mqttClient.Publish(APPNAME+"/status", 2, true, "Online")
token.Wait()
}
func connLostHandler(c MQTT.Client, err error) {
log.Fatal().Err(err).Msg("MQTT Connection lost")
}
func onConnectHandler(c MQTT.Client) {
log.Debug().Msg("MQTT Client connected")
token := mqttClient.Subscribe(SEND_TOPIC, 0, receive)
if token.Wait() && token.Error() != nil {
log.Fatal().Err(token.Error()).Msgf("Could not subscribe to %s", SEND_TOPIC)
}
}