Skip to content

Commit

Permalink
Improve MQTT (#2091)
Browse files Browse the repository at this point in the history
* moved functions

* use hostname as default MQTT maintopic if parameter is not set

* use hostname as default MQTT client ID

* Only send Homassistant Discovery and Static Topics on the first connect. Retry in next round if any topic failed

* .

* add missing return code usage

* send maintopic/connection on every round like the system topics

---------

Co-authored-by: CaCO3 <caco@ruinelli.ch>
  • Loading branch information
caco3 and CaCO3 authored Feb 27, 2023
1 parent c5b20f3 commit a1a77ae
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 150 deletions.
32 changes: 19 additions & 13 deletions code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void ClassFlowMQTT::SetInitialParameter(void)
topicUptime = "";
topicFreeMem = "";

clientname = "AIOTED-" + getMac();
clientname = wlan_config.hostname;

OldValue = "";
flowpostprocessing = NULL;
Expand Down Expand Up @@ -166,7 +166,6 @@ bool ClassFlowMQTT::ReadParameter(FILE* pfile, string& aktparamgraph)
if (((toUpper(splitted[0]) == "TOPIC") || (toUpper(splitted[0]) == "MAINTOPIC")) && (splitted.size() > 1))
{
maintopic = splitted[1];
mqttServer_setMainTopic(maintopic);
}
}

Expand All @@ -175,6 +174,8 @@ bool ClassFlowMQTT::ReadParameter(FILE* pfile, string& aktparamgraph)
* How ever we need the interval parameter from the ClassFlowControll, but that only gets started later.
* To work around this, we delay the start and trigger it from ClassFlowControll::ReadParameter() */

mqttServer_setMainTopic(maintopic);

return true;
}

Expand Down Expand Up @@ -210,6 +211,7 @@ bool ClassFlowMQTT::Start(float AutoInterval)

bool ClassFlowMQTT::doFlow(string zwtime)
{
bool success;
std::string result;
std::string resulterror = "";
std::string resultraw = "";
Expand All @@ -221,7 +223,7 @@ bool ClassFlowMQTT::doFlow(string zwtime)
string zw = "";
string namenumber = "";

publishSystemData();
success = publishSystemData();

if (flowpostprocessing && getMQTTisConnected())
{
Expand All @@ -247,13 +249,13 @@ bool ClassFlowMQTT::doFlow(string zwtime)


if (result.length() > 0)
MQTTPublish(namenumber + "value", result, SetRetainFlag);
success |= MQTTPublish(namenumber + "value", result, SetRetainFlag);

if (resulterror.length() > 0)
MQTTPublish(namenumber + "error", resulterror, SetRetainFlag);
success |= MQTTPublish(namenumber + "error", resulterror, SetRetainFlag);

if (resultrate.length() > 0) {
MQTTPublish(namenumber + "rate", resultrate, SetRetainFlag);
success |= MQTTPublish(namenumber + "rate", resultrate, SetRetainFlag);

std::string resultRatePerTimeUnit;
if (getTimeUnit() == "h") { // Need conversion to be per hour
Expand All @@ -262,22 +264,22 @@ bool ClassFlowMQTT::doFlow(string zwtime)
else { // Keep per minute
resultRatePerTimeUnit = resultrate;
}
MQTTPublish(namenumber + "rate_per_time_unit", resultRatePerTimeUnit, SetRetainFlag);
success |= MQTTPublish(namenumber + "rate_per_time_unit", resultRatePerTimeUnit, SetRetainFlag);
}

if (resultchangabs.length() > 0) {
MQTTPublish(namenumber + "changeabsolut", resultchangabs, SetRetainFlag); // Legacy API
MQTTPublish(namenumber + "rate_per_digitalization_round", resultchangabs, SetRetainFlag);
success |= MQTTPublish(namenumber + "changeabsolut", resultchangabs, SetRetainFlag); // Legacy API
success |= MQTTPublish(namenumber + "rate_per_digitalization_round", resultchangabs, SetRetainFlag);
}

if (resultraw.length() > 0)
MQTTPublish(namenumber + "raw", resultraw, SetRetainFlag);
success |= MQTTPublish(namenumber + "raw", resultraw, SetRetainFlag);

if (resulttimestamp.length() > 0)
MQTTPublish(namenumber + "timestamp", resulttimestamp, SetRetainFlag);
success |= MQTTPublish(namenumber + "timestamp", resulttimestamp, SetRetainFlag);

std::string json = flowpostprocessing->getJsonFromNumber(i, "\n");
MQTTPublish(namenumber + "json", json, SetRetainFlag);
success |= MQTTPublish(namenumber + "json", json, SetRetainFlag);
}
}

Expand All @@ -295,10 +297,14 @@ bool ClassFlowMQTT::doFlow(string zwtime)
// result = result + "\t" + zw;
// }
// }
// MQTTPublish(topic, result, SetRetainFlag);
// success |= MQTTPublish(topic, result, SetRetainFlag);
// }

OldValue = result;

if (!success) {
LogFile.WriteToFile(ESP_LOG_WARN, TAG, "One or more MQTT topics failed to be published!");
}

return true;
}
Expand Down
31 changes: 31 additions & 0 deletions code/components/jomjol_helper/Helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,3 +963,34 @@ std::string UrlDecode(const std::string& value)

return result;
}


bool replaceString(std::string& s, std::string const& toReplace, std::string const& replaceWith) {
return replaceString(s, toReplace, replaceWith, true);
}


bool replaceString(std::string& s, std::string const& toReplace, std::string const& replaceWith, bool logIt) {
std::size_t pos = s.find(toReplace);

if (pos == std::string::npos) { // Not found
return false;
}

std::string old = s;
s.replace(pos, toReplace.length(), replaceWith);
if (logIt) {
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Migrated Configfile line '" + old + "' to '" + s + "'");
}
return true;
}


bool isInString(std::string& s, std::string const& toFind) {
std::size_t pos = s.find(toFind);

if (pos == std::string::npos) { // Not found
return false;
}
return true;
}
4 changes: 4 additions & 0 deletions code/components/jomjol_helper/Helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ const char* get404(void);

std::string UrlDecode(const std::string& value);

bool replaceString(std::string& s, std::string const& toReplace, std::string const& replaceWith);
bool replaceString(std::string& s, std::string const& toReplace, std::string const& replaceWith, bool logIt);
bool isInString(std::string& s, std::string const& toFind);

#endif //HELPER_H
3 changes: 1 addition & 2 deletions code/components/jomjol_mqtt/interface_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,7 @@ bool mqtt_handler_flow_start(std::string _topic, char* _data, int _data_len) {
void MQTTconnected(){
if (mqtt_connected) {
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Connected to broker");
MQTTPublish(lwt_topic, lwt_connected, true); // Publish "connected" to maintopic/connection


if (connectFunktionMap != NULL) {
for(std::map<std::string, std::function<void()>>::iterator it = connectFunktionMap->begin(); it != connectFunktionMap->end(); ++it) {
it->second();
Expand Down
Loading

0 comments on commit a1a77ae

Please sign in to comment.