Skip to content

Commit 093dfcd

Browse files
committed
Added support for reading master settings from MQTT with no web server
1 parent 521bae5 commit 093dfcd

File tree

3 files changed

+172
-26
lines changed

3 files changed

+172
-26
lines changed

src/MI/ModuleInterfaceMqttTransfer.h

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <MI/MITransferBase.h>
2222
#include <utils/MITime.h>
2323
#include <utils/MIUptime.h>
24+
#include <utils/MIUtilities.h>
2425
#include <utils/BinaryBuffer.h>
2526

2627
#include <ReconnectingMqttClient.h>
@@ -42,6 +43,35 @@ class MIMqttTransfer : public MITransferBase {
4243
// State
4344
ReconnectingMqttClient client;
4445

46+
// Debug print related
47+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT) || defined(DEBUG_PRINT_TIMES)
48+
uint32_t last_settings_debug_print_ms = 0;
49+
bool some_settings_missing = true;
50+
#endif
51+
52+
53+
void debug_print_missing_settings() {
54+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT) || defined(DEBUG_PRINT_TIMES)
55+
if (some_settings_missing && mi_interval_elapsed(last_settings_debug_print_ms, 10000)) {
56+
some_settings_missing = false;
57+
for (uint8_t module_ix = 0; module_ix < interfaces.get_module_count(); module_ix++) {
58+
uint8_t initialized = interfaces[module_ix]->settings.get_initialized_count(),
59+
total = interfaces[module_ix]->settings.get_num_variables();
60+
if (initialized < total) {
61+
some_settings_missing = true;
62+
printf("Still missing %d settings for %s: ", (total - initialized), interfaces[module_ix]->module_name);
63+
for (uint8_t v = 0; v < total; v++) {
64+
if (!interfaces[module_ix]->settings.get_module_variable(v).is_initialized()) {
65+
printf("%s ", interfaces[module_ix]->settings.get_module_variable(v).name);
66+
}
67+
}
68+
printf("\n");
69+
}
70+
}
71+
}
72+
#endif
73+
}
74+
4575
public:
4676
MIMqttTransfer(ModuleInterfaceSet &module_interface_set, const uint8_t *broker_address, const uint16_t broker_port = 1883) :
4777
MITransferBase(module_interface_set) {
@@ -56,7 +86,7 @@ class MIMqttTransfer : public MITransferBase {
5686
client.set_address(broker_ip, broker_port, "modulemaster");
5787
}
5888

59-
void start() {
89+
virtual void start() {
6090
#ifdef MIMQTT_USE_JSON
6191
client.subscribe("moduleinterface/+/setting,moduleinterface/+/input", 1);
6292
#else
@@ -190,7 +220,11 @@ class MIMqttTransfer : public MITransferBase {
190220
#endif
191221
}
192222

193-
// Override this in derived classes to read master settings or other topics
223+
// Override this in derived classes to read master settings
224+
virtual void read_master_topic(const char *modulename, const char *category,
225+
const char *topic, const char *data, uint16_t len, uint8_t transfer_ix) { }
226+
227+
// Override this in derived classes to read other topics
194228
virtual void read_nonmodule_topic(const char *modulename, const char *category,
195229
const char *topic, const char *data, uint16_t len, uint8_t transfer_ix) { }
196230

@@ -228,9 +262,16 @@ class MIMqttTransfer : public MITransferBase {
228262
}
229263
}
230264
*/
265+
266+
#ifdef DEBUG_PRINT_SETTINGUPDATE_MQTT
267+
printf("Got topic '%s' for module '%s' ix %d\n", topic, modulename.c_str(), module_ix);
268+
#endif
231269
if (!mi) {
232270
// Read settings not meant for a module, for example master settings
233-
read_nonmodule_topic(modulename.c_str(), category.c_str(), topic, data, len, transfer_ix);
271+
if (strncmp("master_", modulename.c_str(), 7) == 0)
272+
read_master_topic(modulename.c_str(), category.c_str(), topic, data, len, transfer_ix);
273+
else
274+
read_nonmodule_topic(modulename.c_str(), category.c_str(), topic, data, len, transfer_ix);
234275
return;
235276
}
236277

@@ -290,13 +331,19 @@ class MIMqttTransfer : public MITransferBase {
290331
if (mv.is_changed() && is_event) mv.set_event(true); // Trigger immediate transfer to modules
291332
#if defined(MASTER_MULTI_TRANSFER)
292333
mv.set_initialized();
293-
if (mvs->is_initialized()) mvs->set_updated(); // All variables have been set, and one was just updated
294-
#endif
295-
#if defined(MASTER_MULTI_TRANSFER) && defined(DEBUG_PRINT_SETTINGSYNC)
334+
if (mvs->is_initialized()) {
335+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT) || defined(DEBUG_PRINT_TIMES)
336+
if (settings && !mvs->is_updated())
337+
printf("GOT ALL settings for %s\n", modulename.c_str());
338+
#endif
339+
mvs->set_updated(); // All variables have been set, and one was just updated
340+
}
341+
#ifdef DEBUG_PRINT_SETTINGSYNC
296342
if (prev_bits != mv.change_bits)
297343
printf("FROM MQTT '%s' ix: %d VALUE %ld cbits: %d->%d changed:%d->%d->%d\n", variable_name.c_str(), varpos,
298344
mv.get_uint32(), prev_bits, mv.change_bits, prev_changed, mv_new.is_changed(), mv.is_changed());
299345
#endif
346+
#endif
300347
}
301348
#endif
302349
}

src/MI/ModuleVariableSet.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,16 @@ struct ModuleVariableSet {
409409

410410
uint8_t get_num_variables() const { return num_variables; }
411411

412+
#if defined(IS_MASTER) && defined(MASTER_MULTI_TRANSFER)
413+
uint8_t get_initialized_count() const {
414+
uint8_t count = 0;
415+
for (uint8_t i = 0; i < num_variables; i++) {
416+
if (variables[i].is_initialized()) count++;
417+
}
418+
return count;
419+
}
420+
#endif
421+
412422
uint8_t get_variable_ix(const char *variable_name) const {
413423
#ifdef IS_MASTER
414424
for (uint8_t i = 0; i < num_variables; i++)

src/MI_PJON/PJONModuleInterfaceMqttTransfer.h

Lines changed: 109 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,98 @@ class PJONMIMqttTransfer : public MIMqttTransfer {
2828
// Whether to read master settings from MQTT or not
2929
void set_read_master_settings(bool read) { read_master_settings = read; }
3030

31+
virtual void start() {
32+
// When starting based on master settings from MQTT, we are doing it in 3 phases:
33+
// 1. Get master settings, create ModuleInterface objects based on module list.
34+
// 2. Get contracts from all modules.
35+
// 3. Reconnect to MQTT now subscribing to all settings and inputs, so that
36+
// these will be registered in objects and synced to modules.
37+
38+
// If we have all master settings then start normally (phase 3)
39+
if (!read_master_settings || (got_settings() && got_contracts())) {
40+
phase = PHASE_RUNNING;
41+
MIMqttTransfer::start();
42+
}
43+
else {
44+
// Subscribe to master settings only (phase 1)
45+
phase = PHASE_WAIT_MQTT;
46+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT)
47+
printf("%%%%%%%%%%%% PHASE_WAIT_MQTT starting at %dms.\n", millis());
48+
#endif
49+
String master_topic = "moduleinterface/master_"; master_topic += interfaces.get_prefix();
50+
#ifdef MIMQTT_USE_JSON
51+
master_topic += "/setting";
52+
#else
53+
master_topic += "/setting/+";
54+
#endif
55+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT)
56+
printf("Subscribing to topic '%s'\n", master_topic.c_str());
57+
#endif
58+
client.subscribe(master_topic.c_str(), 1);
59+
client.start();
60+
}
61+
}
62+
3163
private:
64+
// Flags used in got_master_settings member
65+
const uint8_t SETTING_MODULE_LIST = 1, SETTING_DEVID = 2, SETTING_INTERVAL = 4, SETTING_ALL = 7;
66+
67+
// Startup phases
68+
enum Phase { PHASE_STOPPED, PHASE_WAIT_MQTT, PHASE_WAIT_CONTRACTS, PHASE_RUNNING };
69+
3270
bool read_master_settings = false;
71+
uint8_t got_master_settings = 0; // Bitmasked,
72+
enum Phase phase = PHASE_STOPPED;
73+
long contract_retrieval_start = 0;
74+
75+
bool got_settings() const { return (got_master_settings & SETTING_ALL) == SETTING_ALL; }
76+
bool got_contracts() const {
77+
return interfaces.got_all_contracts()
78+
&& ((interfaces.get_inactive_module_count() == 0)
79+
|| (uint32_t(millis() - contract_retrieval_start) > MI_INACTIVE_TIME_THRESHOLD*1000l));
80+
}
3381

34-
virtual void read_nonmodule_topic(const char *modulename,
35-
const char *category,
36-
const char *topic,
37-
const char *data,
38-
uint16_t len,
39-
uint8_t transfer_ix)
82+
virtual void update() {
83+
MIMqttTransfer::update();
84+
check_phase();
85+
if (read_master_settings) debug_print_missing_settings();
86+
}
87+
88+
void check_phase() {
89+
// Check if we are ready to transition to a new connection phase
90+
switch(phase) {
91+
case PHASE_STOPPED: break;
92+
case PHASE_WAIT_MQTT:
93+
if (got_settings()) {
94+
phase = PHASE_WAIT_CONTRACTS;
95+
contract_retrieval_start = millis();
96+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT)
97+
printf("%%%%%%%%%%%% PHASE_WAIT_CONTRACTS starting at %dms. Got all master settings.\n", millis());
98+
#endif
99+
}
100+
break;
101+
case PHASE_WAIT_CONTRACTS:
102+
if (got_contracts()) {
103+
// Reconnect to MQTT broker wil full subscription, not only master settings
104+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT)
105+
printf("%%%%%%%%%%%% PHASE_RUNNING starting at %dms. Got %d (%d inactive) modules.\n", millis(), interfaces.get_module_count(), interfaces.get_inactive_module_count());
106+
#endif
107+
phase = PHASE_RUNNING;
108+
stop();
109+
start();
110+
}
111+
break;
112+
case PHASE_RUNNING: break;
113+
}
114+
}
115+
116+
virtual void read_master_topic(const char *modulename,
117+
const char *category,
118+
const char *topic,
119+
const char *data,
120+
uint16_t len,
121+
uint8_t transfer_ix)
40122
{
41-
printf("Starting on read_nonmodule_topic '%s'\n", topic);
42-
if (strncmp(topic, "moduleinterface/master_m1/setting/modules", 33) == 0) printf("------> Modules: '%s'\n", data);
43-
printf("%d Category: %s\n", read_master_settings, category);
44123
if (!read_master_settings) return;
45124
bool for_me = false;
46125
{
@@ -50,32 +129,42 @@ printf("%d Category: %s\n", read_master_settings, category);
50129
if (for_me && strcmp(category, "setting")==0) {
51130
// A setting for this master
52131
#ifdef MIMQTT_USE_JSON
53-
read_master_json_settings_from_buffer(interfaces, data, len);
132+
bool ok = read_master_json_settings_from_buffer((PJONModuleInterfaceSet&) interfaces, data, len);
133+
got_master_settings = ok ? SETTING_ALL : 0;
134+
// Check if module list was changed and objects recreated
135+
bool all_empty = true;
136+
for (uint8_t m = 0; m < interfaces.get_module_count(); m++) {
137+
if (interfaces[m]->settings.get_num_variables() > 0 || interfaces[m]->outputs.get_num_variables() > 0) {
138+
all_empty = false; break;
139+
}
140+
}
141+
if (all_empty) phase = PHASE_WAIT_MQTT; // Trigger exchange of contracts followed by reconnect to get all settings
54142
#else
55143
if (strstr(topic, "/modules")>0) {
56-
if (((PJONModuleInterfaceSet&)interfaces).set_interface_list(data)) {
57-
// The module list changed. Reconnect to get all settings from MQTT to new module objects.
58-
printf("--> GOT NEW MODULE LIST. Reconnecting to get settings.\n");
59-
#ifdef DEBUG_PRINT
60-
DPRINTLN("--> GOT NEW MODULE LIST. Reconnecting to get settings.");
144+
if (((PJONModuleInterfaceSet&)interfaces).set_interface_list(data)) {
145+
// The module list changed. Reconnect to get all settings from MQTT to new module objects.
146+
got_master_settings |= SETTING_MODULE_LIST;
147+
phase = PHASE_WAIT_MQTT;
148+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT)
149+
if (phase != PHASE_WAIT_MQTT) DPRINTLN("--> GOT NEW MODULE LIST. Changing to PHASE_WAIT_MQTT.");
61150
#endif
62-
stop();
63-
start();
64-
}
65-
#ifdef DEBUG_PRINT
151+
}
152+
#if defined(DEBUG_PRINT) || defined(DEBUG_PRINT_SETTINGUPDATE_MQTT)
66153
DPRINT("Modules: '"); DPRINT(data); DPRINTLN("'");
67154
#endif
68155
} else if (strstr(topic, "/devid")>0) {
69156
// Set PJON id for master
157+
got_master_settings |= SETTING_DEVID;
70158
uint8_t device_id = (uint8_t) atoi(data);
71159
if (device_id != 0) ((PJONModuleInterfaceSet&)interfaces).get_link()->set_id(device_id);
72160
} else if (strstr(topic, "/intsettings")>0) {
73161
// Set time interval between exchanges
162+
got_master_settings |= SETTING_INTERVAL;
74163
uint32_t interval = atoi(data);
75164
if (interval > 100) ((PJONModuleInterfaceSet&)interfaces).set_transfer_interval(interval);
76165
}
77166
#endif
167+
check_phase();
78168
}
79-
printf("Finished with read_nonmodule_topic");
80169
}
81170
};

0 commit comments

Comments
 (0)