Skip to content

Commit ecac2a1

Browse files
committed
MQTT URL
1 parent 59fadd6 commit ecac2a1

File tree

3 files changed

+21
-21
lines changed

3 files changed

+21
-21
lines changed

cmd/device-gateway/config.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -203,24 +203,24 @@ func (p *RestProtocol) Validate() error {
203203
}
204204

205205
type MqttProtocol struct {
206-
Discover bool `json:"discover"`
207-
ServerUri string `json:"serverUri"`
208-
Prefix string `json:"prefix"`
209-
Username string `json:"username"`
210-
Password string `json:"password"`
211-
CaFile string `json:"caFile"`
212-
CertFile string `json:"certFile"`
213-
KeyFile string `json:"keyFile"`
206+
Discover bool `json:"discover"`
207+
URL string `json:"url"`
208+
Prefix string `json:"prefix"`
209+
Username string `json:"username"`
210+
Password string `json:"password"`
211+
CaFile string `json:"caFile"`
212+
CertFile string `json:"certFile"`
213+
KeyFile string `json:"keyFile"`
214214
}
215215

216216
func (p *MqttProtocol) Validate() error {
217217
if !p.Discover {
218-
serverUri, err := url.Parse(p.ServerUri)
218+
url, err := url.Parse(p.URL)
219219
if err != nil {
220-
return fmt.Errorf("MQTT ServerUri must be a URI in the format scheme://host:port")
220+
return fmt.Errorf("MQTT broker URL must be a valid URI in the format scheme://host:port")
221221
}
222-
if serverUri.Scheme != "tcp" && serverUri.Scheme != "ssl" {
223-
return fmt.Errorf("MQTT ServerUri scheme must be either 'tcp' or 'ssl'")
222+
if url.Scheme != "tcp" && url.Scheme != "ssl" {
223+
return fmt.Errorf("MQTT broker URL scheme must be either 'tcp' or 'ssl'")
224224
}
225225
}
226226

cmd/device-gateway/mqtt.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (p *MQTTPublisher) dataInbox() chan<- AgentResponse {
6969
func (p *MQTTPublisher) start() {
7070
logger.Println("MQTTPublisher.start()")
7171

72-
if p.config.Discover && p.config.ServerUri == "" {
72+
if p.config.Discover && p.config.URL == "" {
7373
err := p.discoverBrokerEndpoint()
7474
if err != nil {
7575
logger.Println("MQTTPublisher.start() failed to start publisher:", err.Error())
@@ -81,7 +81,7 @@ func (p *MQTTPublisher) start() {
8181
p.configureMqttConnection()
8282

8383
// start the connection routine
84-
logger.Printf("MQTTPublisher.start() Will connect to the broker %v\n", p.config.ServerUri)
84+
logger.Printf("MQTTPublisher.start() Will connect to the broker %v\n", p.config.URL)
8585
go p.connect(0)
8686

8787
qos := 1
@@ -128,7 +128,7 @@ func (p *MQTTPublisher) discoverBrokerEndpoint() error {
128128
}
129129
logger.Println(proto.Endpoint["url"])
130130
if ProtocolType(proto.Type) == ProtocolTypeMQTT {
131-
p.config.ServerUri = proto.Endpoint["url"].(string)
131+
p.config.URL = proto.Endpoint["url"].(string)
132132
break
133133
}
134134
}
@@ -154,7 +154,7 @@ func (p *MQTTPublisher) connect(backOff int) {
154154
return
155155
}
156156
for {
157-
logger.Printf("MQTTPublisher.connect() connecting to the broker %v, backOff: %v sec\n", p.config.ServerUri, backOff)
157+
logger.Printf("MQTTPublisher.connect() connecting to the broker %v, backOff: %v sec\n", p.config.URL, backOff)
158158
time.Sleep(time.Duration(backOff) * time.Second)
159159
if p.client.IsConnected() {
160160
break
@@ -171,7 +171,7 @@ func (p *MQTTPublisher) connect(backOff int) {
171171
}
172172
}
173173

174-
logger.Printf("MQTTPublisher.connect() connected to the broker %v", p.config.ServerUri)
174+
logger.Printf("MQTTPublisher.connect() connected to the broker %v", p.config.URL)
175175
return
176176
}
177177

@@ -185,7 +185,7 @@ func (p *MQTTPublisher) onConnectionLost(client *MQTT.MqttClient, reason error)
185185

186186
func (p *MQTTPublisher) configureMqttConnection() {
187187
connOpts := MQTT.NewClientOptions().
188-
AddBroker(p.config.ServerUri).
188+
AddBroker(p.config.URL).
189189
SetClientId(p.clientId).
190190
SetCleanSession(true).
191191
SetOnConnectionLost(p.onConnectionLost)
@@ -197,7 +197,7 @@ func (p *MQTTPublisher) configureMqttConnection() {
197197
}
198198

199199
// SSL/TLS
200-
if strings.HasPrefix(p.config.ServerUri, "ssl") {
200+
if strings.HasPrefix(p.config.URL, "ssl") {
201201
tlsConfig := &tls.Config{}
202202
// Custom CA to auth broker with a self-signed certificate
203203
if p.config.CaFile != "" {

cmd/device-gateway/registration.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func configureDevices(config *Config) []catalog.Device {
4343
} else if proto.Type == ProtocolTypeMQTT {
4444
mqtt, ok := config.Protocols[ProtocolTypeMQTT].(MqttProtocol)
4545
if ok {
46-
p.Endpoint["broker"] = mqtt.ServerUri
47-
p.Endpoint["topic"] = fmt.Sprintf("%s/%v", mqtt.Prefix, res.Id)
46+
p.Endpoint["url"] = mqtt.URL
47+
p.Endpoint["topic"] = fmt.Sprintf("%s/%v/%v", mqtt.Prefix, device.Name, resource.Name)
4848
}
4949
}
5050
res.Protocols = append(res.Protocols, *p)

0 commit comments

Comments
 (0)