From 02694948f3e0229529e42102a817b35fd144bf63 Mon Sep 17 00:00:00 2001 From: Till Zimmermann Date: Tue, 19 Jun 2018 19:40:08 +0200 Subject: [PATCH] Initial MQTT Version --- Makefile.am | 4 + common.h | 16 +++ configure.ac | 20 +++- mqtt.c | 209 ++++++++++++++++++++++++++++++++++++ mqtt.h | 15 +++ rtsp.c | 9 ++ scripts/shairport-sync.conf | 22 ++++ shairport.c | 119 +++++++++++++++----- 8 files changed, 388 insertions(+), 26 deletions(-) create mode 100644 mqtt.c create mode 100644 mqtt.h diff --git a/Makefile.am b/Makefile.am index f0bcca7f1..e5f33afdd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -79,6 +79,10 @@ if USE_METADATA_HUB shairport_sync_SOURCES += metadata_hub.c endif +if USE_MQTT +shairport_sync_SOURCES += mqtt.c +endif + if USE_DACP_CLIENT shairport_sync_SOURCES += dacp.c tinyhttp/chunk.c tinyhttp/header.c tinyhttp/http.c endif diff --git a/common.h b/common.h index 87493cb14..86cfa1a16 100644 --- a/common.h +++ b/common.h @@ -91,6 +91,22 @@ typedef struct { int metadata_sockport; size_t metadata_sockmsglength; int get_coverart; +#endif +#ifdef CONFIG_MQTT + int mqtt_enabled; + char *mqtt_hostname; + int mqtt_port; + char *mqtt_username; + char *mqtt_password; + char *mqtt_capath; + char *mqtt_cafile; + char *mqtt_certfile; + char *mqtt_keyfile; + char *mqtt_topic; + int mqtt_publish_raw; + int mqtt_publish_parsed; + int mqtt_publish_cover; + int mqtt_enable_remote; #endif uint8_t hw_addr[6]; int port; diff --git a/configure.ac b/configure.ac index 5a98fa845..f79041c5e 100644 --- a/configure.ac +++ b/configure.ac @@ -314,7 +314,25 @@ AM_CONDITIONAL([USE_MPRIS_CLIENT], [test "x$HAS_MPRIS_CLIENT" = "x1"]) #AM_CONDITIONAL([USE_DBUS_CORE_AND_DACP], [test "x$HAS_MPRIS" = "x1" -o "x$HAS_DBUS" = "x1"]) -if test "x$HAS_MPRIS" = "x1" -o "x$HAS_DBUS" = "x1" ; then +# Look for mqtt flag +AC_ARG_WITH(mqtt-interface, [ --with-mqtt-interface = include support for the native Shairport Sync MQTT interface], [ + AC_MSG_RESULT(>>Including MQTT support) + HAS_MQTT=1 + AC_DEFINE([HAVE_MQTT], 1, [Needed by the compiler.]) + AC_DEFINE([CONFIG_MQTT], 1, [Needed by the compiler.]) + AC_CHECK_LIB([mosquitto], [mosquitto_lib_init], , AC_MSG_ERROR(MQTT support requires the mosquitto library!)) + AC_CHECK_HEADERS([mosquitto.h],,[AC_MSG_ERROR([ header missing])]) + AC_DEFINE([HAVE_MOSQUITTO],[1],[Define to 1 if you have the mosquitto library]) + + AC_MSG_RESULT(>>Enabling metadata as it is required by mqtt) + AC_DEFINE([CONFIG_METADATA], 1, [Needed by the compiler.]) + AC_DEFINE([USE_METADATA], 1, [Needed by the compiler.]) + PKG_CHECK_MODULES([GIO_UNIX], [gio-unix-2.0 >= 2.30.0],[CFLAGS="${GIO_UNIX_CFLAGS} ${CFLAGS}" LIBS="${GIO_UNIX_LIBS} ${LIBS}"],[AC_MSG_ERROR(dbus messaging support for mpris requires the glib 2.0 library -- libglib2.0-dev suggested!)]) + ],) +AM_CONDITIONAL([USE_MQTT], [test "x$HAS_MQTT" = "x1"]) + + +if test "x$HAS_MPRIS" = "x1" -o "x$HAS_DBUS" = "x1" -o "x$HAS_MQTT"; then AC_MSG_RESULT(>>Including the metadata hub) HAS_METADATA_HUB=1 AC_DEFINE([HAVE_METADATA_HUB], 1, [Needed by the compiler.]) diff --git a/mqtt.c b/mqtt.c new file mode 100644 index 000000000..478c476fa --- /dev/null +++ b/mqtt.c @@ -0,0 +1,209 @@ +#include +#include +#include + +#include "config.h" + +#include "common.h" +#include "player.h" +#include "rtsp.h" + +#include "rtp.h" + +#include "dacp.h" +#include +#include "mqtt.h" + +//this holds the mosquitto client +struct mosquitto *mosq = NULL; +char *topic = NULL; + +//mosquitto logging +void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str){ + switch(level){ + case MOSQ_LOG_DEBUG: + debug(1, str); + break; + case MOSQ_LOG_INFO: + debug(2, str); + break; + case MOSQ_LOG_NOTICE: + debug(3, str); + break; + case MOSQ_LOG_WARNING: + inform(str); + break; + case MOSQ_LOG_ERR: { + die("MQTT: Error: %s\n", str); + } + } +} + +//mosquitto message handler +void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg){ + + //null-terminate the payload + char payload[msg->payloadlen+1]; + memcpy(payload,msg->payload,msg->payloadlen); + payload[msg->payloadlen]=0; + + debug(1, "[MQTT]: received Message on topic %s: %s\n",msg->topic, payload); + + //All recognized commands + char* commands[] = { + "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause", + "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup", + NULL}; + + int it=0; + + //send command if it's a valid one + while(commands[it++]!=NULL){ + if( msg->payloadlen>=strlen(commands[it]) && + strncmp(msg->payload, commands[it], strlen(commands[it]))==0 + ){ + debug(1, "[MQTT]: DACP Command: %s\n",commands[it]); + send_simple_dacp_command(commands[it]); + break; + } + } +} + +void on_disconnect(struct mosquitto* mosq, void* userdata, int rc){ + debug(1, "[MQTT]: disconnected"); +} + +void on_connect(struct mosquitto* mosq, void* userdata, int rc){ + debug(1, "[MQTT]: connected"); + + //subscribe if requested + if(config.mqtt_enable_remote){ + char remotetopic[strlen(config.mqtt_topic)+8]; + snprintf(remotetopic,strlen(config.mqtt_topic)+8,"%s/remote",config.mqtt_topic); + mosquitto_subscribe(mosq,NULL,remotetopic,0); + } +} + +//helper function to publish under a topic and automatically append the main topic +void mqtt_publish(char* topic, char* data, uint32_t length){ + char fulltopic[strlen(config.mqtt_topic)+strlen(topic)+3]; + snprintf(fulltopic, strlen(config.mqtt_topic)+strlen(topic)+2, "%s/%s", config.mqtt_topic, topic); + debug(1, "[MQTT]: publishing under %s",fulltopic); + mosquitto_publish(mosq, NULL, fulltopic, length, data, 0, 0); +} + +//handler for incoming metadata +void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length){ + if(config.mqtt_publish_raw){ + uint32_t val; + char topic[] = "____/____"; + + val=htonl(type); + memcpy(topic,&val, 4); + val=htonl(code); + memcpy(topic+5,&val, 4); + mqtt_publish(topic, data, length); + } + if(config.mqtt_publish_parsed){ + if(type=='core'){ + switch (code) { + case 'asar': + mqtt_publish("artist", data, length); + break; + case 'asal': + mqtt_publish("album", data, length); + break; + case 'minm': + mqtt_publish("title", data, length); + break; + case 'asgn': + mqtt_publish("genre", data, length); + break; + case 'asfm': + mqtt_publish("format", data, length); + break; + } + }else if(type=='ssnc'){ + switch (code) { + case 'asal': + mqtt_publish("songalbum", data, length); + break; + case 'pvol': + mqtt_publish("volume", data, length); + break; + case 'clip': + mqtt_publish("client_ip", data, length); + break; + case 'pbeg': + mqtt_publish("play_start", data, length); + break; + case 'pend': + mqtt_publish("play_end", data, length); + break; + case 'pfls': + mqtt_publish("play_flush", data, length); + break; + case 'prsm': + mqtt_publish("play_resume", data, length); + break; + case 'PICT': + if(config.mqtt_publish_parsed){ + mqtt_publish("cover", data, length); + } + break; + } + } + } + + return; +} + + +int initialise_mqtt() { + debug(1, "Initialising MQTT"); + if(config.mqtt_hostname==NULL){ + debug(1, "[MQTT]: Not initialized, as the hostname is not set"); + return 0; + } + int keepalive = 60; + mosquitto_lib_init(); + if( !(mosq = mosquitto_new(config.service_name, true, NULL)) ){ + die("[MQTT]: FATAL: Could not create mosquitto object! %d\n", mosq); + } + + if( + config.mqtt_cafile != NULL || + config.mqtt_capath != NULL || + config.mqtt_certfile != NULL || + config.mqtt_keyfile != NULL + ){ + if(mosquitto_tls_set(mosq,config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) { + die("[MQTT]: TLS Setup failed"); + } + } + + if( + config.mqtt_username != NULL || + config.mqtt_password != NULL + ){ + if(mosquitto_username_pw_set(mosq,config.mqtt_username,config.mqtt_password) != MOSQ_ERR_SUCCESS) { + die("[MQTT]: Username/Password set failed"); + } + } + mosquitto_log_callback_set(mosq, _cb_log); + + if(config.mqtt_enable_remote){ + mosquitto_message_callback_set(mosq, on_message); + } + + mosquitto_disconnect_callback_set(mosq, on_disconnect); + mosquitto_connect_callback_set(mosq, on_connect); + if(mosquitto_connect(mosq, config.mqtt_hostname, config.mqtt_port, keepalive)){ + inform("[MQTT]: Could not establish a mqtt connection"); + } + if(mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS){ + inform("[MQTT]: Could start MQTT Main loop"); + } + + return 0; +} diff --git a/mqtt.h b/mqtt.h new file mode 100644 index 000000000..b5ec5ab05 --- /dev/null +++ b/mqtt.h @@ -0,0 +1,15 @@ +#ifndef MQTT_H +#define MQTT_H +#include +#include + + +int initialise_mqtt(); +void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length); +void mqtt_publish(char* topic, char* data, uint32_t length); +void mqtt_setup(); +void on_connect(struct mosquitto* mosq, void* userdata, int rc); +void on_disconnect(struct mosquitto* mosq, void* userdata, int rc); +void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg); +void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str); +#endif /* #ifndef MQTT_H */ diff --git a/rtsp.c b/rtsp.c index 0267e7f5d..84c9ae202 100644 --- a/rtsp.c +++ b/rtsp.c @@ -67,6 +67,10 @@ #include "metadata_hub.h" #endif +#ifdef HAVE_MQTT +#include "mqtt.h" +#endif + #ifdef AF_INET6 #define INETx_ADDRSTRLEN INET6_ADDRSTRLEN #else @@ -1240,6 +1244,11 @@ void *metadata_thread_function(__attribute__((unused)) void *ignore) { metadata_process(pack.type, pack.code, pack.data, pack.length); #ifdef HAVE_METADATA_HUB metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length); +#endif +#ifdef HAVE_MQTT + if(config.mqtt_enabled){ + mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length); + } #endif } if (pack.carrier) diff --git a/scripts/shairport-sync.conf b/scripts/shairport-sync.conf index 2f7f1378d..4f307aa8c 100644 --- a/scripts/shairport-sync.conf +++ b/scripts/shairport-sync.conf @@ -144,6 +144,28 @@ metadata = // socket_msglength = 65000; // the maximum packet size for any UDP metadata. This will be clipped to be between 500 or 65000. The default is 500. }; +// How to enable the MQTT-metadata/remote-service +mqtt = +{ +// enabled = "no"; // set this to yes to enable the mqtt-metadata-service +// hostname = "iot.eclipse.org"; // Hostname of the MQTT Broker +// port = "1883"; +// username = NULL; //set this to a string with your username, to enable username authentication +// password = NULL; //set this to a string with your password, to enable username & password authentication +// capath = NULL; //set this to the folder with the CA-Certificates to be accepted for the server certificate. If not set, TLS is not used +// cafile = NULL; //this may be used as an (exclusive) alternative to capath with a single file for all ca-certificates +// certfile = NULL; //set this to a string to a user certificate to enable MQTT Client certificates. keyfile must also be set! +// keyfile = NULL; //private key for MQTT Client authentication +// topic = NULL; //MQTT topic where this instance of shairport-sync should publish. If not set, the general.name value is used. +// publish_raw = "no"; //whether to publish all available metadata under the codes given in the 'metadata' docs. +// publish_parsed = "no"; //whether to publish a small (but useful) subset of metadata under human-understandable topics +// Currently published topics:artist,album,title,genre,format,songalbum,volume,client_ip, +// Additionally, empty messages at the topics play_start,play_end,play_flush,play_resume are published +// publish_cover = "no"; //whether to publish the cover over mqtt in binary form. This may lead to a bit of load on the broker +// enable_remote; //whether to remote control via MQTT. RC is available under `topic`/remote. +// Available commands are "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause", "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup" +} + // Diagnostic settings. These are for diagnostic and debugging only. Normally you sould leave them commented out diagnostics = { diff --git a/shairport.c b/shairport.c index 69109e8b9..192451d4a 100644 --- a/shairport.c +++ b/shairport.c @@ -72,6 +72,10 @@ #include "dbus-service.h" #endif +#ifdef HAVE_MQTT +#include "mqtt.h" +#endif + #ifdef HAVE_MPRIS #include "mpris-service.h" #endif @@ -91,6 +95,24 @@ #include #endif +static inline int config_set_lookup_bool(config_t* cfg, char* where, int* dst) { + const char *str = 0; + if (config_lookup_string(cfg, where, &str)) { + if (strcasecmp(str, "no") == 0){ + (*dst)=0; + return 1; + }else if (strcasecmp(str, "yes") == 0){ + (*dst)=1; + return 1; + }else{ + die("Invalid %s option choice \"%s\". It should be \"yes\" or \"no\"", where, str); + return 0; + } + }else{ + return 0; + } +} + static int shutting_down = 0; char configuration_file_path[4096 + 1]; char actual_configuration_file_path[4096 + 1]; @@ -379,26 +401,11 @@ int parse_options(int argc, char **argv) { int daemonisewithout = 0; int daemonisewith = 0; /* Get the Daemonize setting. */ - if (config_lookup_string(config.cfg, "sessioncontrol.daemonize_with_pid_file", &str)) { - if (strcasecmp(str, "no") == 0) - daemonisewith = 0; - else if (strcasecmp(str, "yes") == 0) - daemonisewith = 1; - else - die("Invalid daemonize_with_pid_file option choice \"%s\". It should be \"yes\" or " - "\"no\""); - } + config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_with_pid_file", &daemonisewith); /* Get the Just_Daemonize setting. */ - if (config_lookup_string(config.cfg, "sessioncontrol.daemonize_without_pid_file", &str)) { - if (strcasecmp(str, "no") == 0) - daemonisewithout = 0; - else if (strcasecmp(str, "yes") == 0) - daemonisewithout = 1; - else - die("Invalid daemonize_without_pid_file option choice \"%s\". It should be \"yes\" or " - "\"no\""); - } + config_set_lookup_bool(config.cfg, "sessioncontrol.daemonize_without_pid_file", &daemonisewithout); + if ((daemonisewith) && (daemonisewithout)) die("Select either daemonize_with_pid_file or daemonize_without_pid_file -- you have " "selected both!"); @@ -467,15 +474,9 @@ int parse_options(int argc, char **argv) { } /* Get the statistics setting. */ - if (config_lookup_string(config.cfg, "general.statistics", &str)) { + if (!config_set_lookup_bool(config.cfg, "general.statistics", &(config.statistics_requested))) { warn("The \"general\" \"statistics\" setting is deprecated. Please use the \"diagnostics\" " "\"statistics\" setting instead."); - if (strcasecmp(str, "no") == 0) - config.statistics_requested = 0; - else if (strcasecmp(str, "yes") == 0) - config.statistics_requested = 1; - else - die("Invalid statistics option choice \"%s\". It should be \"yes\" or \"no\""); } /* The old drift tolerance setting. */ @@ -978,6 +979,68 @@ int parse_options(int argc, char **argv) { free(i3); free(vs); +#ifdef CONFIG_MQTT + int tmpval=0; + config_set_lookup_bool(config.cfg, "mqtt.enabled", &config.mqtt_enabled); + if(config.mqtt_enabled && !config.metadata_enabled){ + die("You need to have metadata enabled in order to use mqtt"); + } + if (config_lookup_string(config.cfg, "mqtt.hostname", &str)) { + config.mqtt_hostname = (char *)str; + //TODO: Document that, if this is false, whole mqtt func is disabled + } + if (config_lookup_int(config.cfg, "mqtt.port", &tmpval)) { + config.mqtt_port = tmpval; + }else{ + //TODO: Is this the correct way to set a default value? + config.mqtt_port = 1883; + } + + if (config_lookup_string(config.cfg, "mqtt.username", &str)) { + config.mqtt_username = (char *)str; + } + if (config_lookup_string(config.cfg, "mqtt.password", &str)) { + config.mqtt_password = (char *)str; + } + int capath=0; + if (config_lookup_string(config.cfg, "mqtt.capath", &str)) { + config.mqtt_capath = (char *)str; + capath=1; + } + if (config_lookup_string(config.cfg, "mqtt.cafile", &str)) { + if(capath) + die("Supply either mqtt cafile or mqtt capath -- you have supplied both!"); + config.mqtt_cafile = (char *)str; + } + int certkeynum=0; + if (config_lookup_string(config.cfg, "mqtt.certfile", &str)) { + config.mqtt_certfile = (char *)str; + certkeynum++; + } + if (config_lookup_string(config.cfg, "mqtt.keyfile", &str)) { + config.mqtt_keyfile = (char *)str; + certkeynum++; + } + if( certkeynum!=0 && certkeynum!=2){ + die("If you want to use TLS Client Authentication, you have to specify " + "mqtt.certfile AND mqtt.keyfile.\nYou have supplied only one of them.\n" + "If you do not want to use TLS Client Authentication, leave both empty." + ); + } + if (config_lookup_string(config.cfg, "mqtt.topic", &str)) { + config.mqtt_topic = (char *)str; + }else{ + int topic_length=1+strlen(config.service_name)+1; + char* topic=malloc(topic_length+1); + snprintf(topic,topic_length,"/%s/",config.service_name); + config.mqtt_topic = topic; + } + config_set_lookup_bool(config.cfg, "mqtt.publish_raw", &config.mqtt_publish_raw); + config_set_lookup_bool(config.cfg, "mqtt.publish_parsed", &config.mqtt_publish_parsed); + config_set_lookup_bool(config.cfg, "mqtt.publish_cover", &config.mqtt_publish_cover); + config_set_lookup_bool(config.cfg, "mqtt.enable_remote", &config.mqtt_enable_remote); +#endif + // now, check and calculate the pid directory #ifdef USE_CUSTOM_PID_DIR char *use_this_pid_dir = PIDDIR; @@ -1508,6 +1571,12 @@ int main(int argc, char **argv) { #endif #endif +#ifdef HAVE_MQTT + if(config.mqtt_enabled){ + initialise_mqtt(); + } +#endif + // daemon_log(LOG_INFO, "Successful Startup"); rtsp_listen_loop();