Skip to content

Commit

Permalink
Initial MQTT Version
Browse files Browse the repository at this point in the history
  • Loading branch information
tillz committed Jun 19, 2018
1 parent de18096 commit 0269494
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 26 deletions.
4 changes: 4 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ if USE_METADATA_HUB
shairport_sync_SOURCES += metadata_hub.c
endif

if USE_MQTT
shairport_sync_SOURCES += mqtt.c
endif

if USE_DACP_CLIENT
shairport_sync_SOURCES += dacp.c tinyhttp/chunk.c tinyhttp/header.c tinyhttp/http.c
endif
Expand Down
16 changes: 16 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ typedef struct {
int metadata_sockport;
size_t metadata_sockmsglength;
int get_coverart;
#endif
#ifdef CONFIG_MQTT
int mqtt_enabled;
char *mqtt_hostname;
int mqtt_port;
char *mqtt_username;
char *mqtt_password;
char *mqtt_capath;
char *mqtt_cafile;
char *mqtt_certfile;
char *mqtt_keyfile;
char *mqtt_topic;
int mqtt_publish_raw;
int mqtt_publish_parsed;
int mqtt_publish_cover;
int mqtt_enable_remote;
#endif
uint8_t hw_addr[6];
int port;
Expand Down
20 changes: 19 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,25 @@ AM_CONDITIONAL([USE_MPRIS_CLIENT], [test "x$HAS_MPRIS_CLIENT" = "x1"])

#AM_CONDITIONAL([USE_DBUS_CORE_AND_DACP], [test "x$HAS_MPRIS" = "x1" -o "x$HAS_DBUS" = "x1"])

if test "x$HAS_MPRIS" = "x1" -o "x$HAS_DBUS" = "x1" ; then
# Look for mqtt flag
AC_ARG_WITH(mqtt-interface, [ --with-mqtt-interface = include support for the native Shairport Sync MQTT interface], [
AC_MSG_RESULT(>>Including MQTT support)
HAS_MQTT=1
AC_DEFINE([HAVE_MQTT], 1, [Needed by the compiler.])
AC_DEFINE([CONFIG_MQTT], 1, [Needed by the compiler.])
AC_CHECK_LIB([mosquitto], [mosquitto_lib_init], , AC_MSG_ERROR(MQTT support requires the mosquitto library!))
AC_CHECK_HEADERS([mosquitto.h],,[AC_MSG_ERROR([<mosquitto.h> header missing])])
AC_DEFINE([HAVE_MOSQUITTO],[1],[Define to 1 if you have the mosquitto library])
AC_MSG_RESULT(>>Enabling metadata as it is required by mqtt)
AC_DEFINE([CONFIG_METADATA], 1, [Needed by the compiler.])
AC_DEFINE([USE_METADATA], 1, [Needed by the compiler.])
PKG_CHECK_MODULES([GIO_UNIX], [gio-unix-2.0 >= 2.30.0],[CFLAGS="${GIO_UNIX_CFLAGS} ${CFLAGS}" LIBS="${GIO_UNIX_LIBS} ${LIBS}"],[AC_MSG_ERROR(dbus messaging support for mpris requires the glib 2.0 library -- libglib2.0-dev suggested!)])
],)
AM_CONDITIONAL([USE_MQTT], [test "x$HAS_MQTT" = "x1"])


if test "x$HAS_MPRIS" = "x1" -o "x$HAS_DBUS" = "x1" -o "x$HAS_MQTT"; then
AC_MSG_RESULT(>>Including the metadata hub)
HAS_METADATA_HUB=1
AC_DEFINE([HAVE_METADATA_HUB], 1, [Needed by the compiler.])
Expand Down
209 changes: 209 additions & 0 deletions mqtt.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "config.h"

#include "common.h"
#include "player.h"
#include "rtsp.h"

#include "rtp.h"

#include "dacp.h"
#include <mosquitto.h>
#include "mqtt.h"

//this holds the mosquitto client
struct mosquitto *mosq = NULL;
char *topic = NULL;

//mosquitto logging
void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str){
switch(level){
case MOSQ_LOG_DEBUG:
debug(1, str);
break;
case MOSQ_LOG_INFO:
debug(2, str);
break;
case MOSQ_LOG_NOTICE:
debug(3, str);
break;
case MOSQ_LOG_WARNING:
inform(str);
break;
case MOSQ_LOG_ERR: {
die("MQTT: Error: %s\n", str);
}
}
}

//mosquitto message handler
void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg){

//null-terminate the payload
char payload[msg->payloadlen+1];
memcpy(payload,msg->payload,msg->payloadlen);
payload[msg->payloadlen]=0;

debug(1, "[MQTT]: received Message on topic %s: %s\n",msg->topic, payload);

//All recognized commands
char* commands[] = {
"command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause",
"playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup",
NULL};

int it=0;

//send command if it's a valid one
while(commands[it++]!=NULL){
if( msg->payloadlen>=strlen(commands[it]) &&
strncmp(msg->payload, commands[it], strlen(commands[it]))==0
){
debug(1, "[MQTT]: DACP Command: %s\n",commands[it]);
send_simple_dacp_command(commands[it]);
break;
}
}
}

void on_disconnect(struct mosquitto* mosq, void* userdata, int rc){
debug(1, "[MQTT]: disconnected");
}

void on_connect(struct mosquitto* mosq, void* userdata, int rc){
debug(1, "[MQTT]: connected");

//subscribe if requested
if(config.mqtt_enable_remote){
char remotetopic[strlen(config.mqtt_topic)+8];
snprintf(remotetopic,strlen(config.mqtt_topic)+8,"%s/remote",config.mqtt_topic);
mosquitto_subscribe(mosq,NULL,remotetopic,0);
}
}

//helper function to publish under a topic and automatically append the main topic
void mqtt_publish(char* topic, char* data, uint32_t length){
char fulltopic[strlen(config.mqtt_topic)+strlen(topic)+3];
snprintf(fulltopic, strlen(config.mqtt_topic)+strlen(topic)+2, "%s/%s", config.mqtt_topic, topic);
debug(1, "[MQTT]: publishing under %s",fulltopic);
mosquitto_publish(mosq, NULL, fulltopic, length, data, 0, 0);
}

//handler for incoming metadata
void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length){
if(config.mqtt_publish_raw){
uint32_t val;
char topic[] = "____/____";

val=htonl(type);
memcpy(topic,&val, 4);
val=htonl(code);
memcpy(topic+5,&val, 4);
mqtt_publish(topic, data, length);
}
if(config.mqtt_publish_parsed){
if(type=='core'){
switch (code) {
case 'asar':
mqtt_publish("artist", data, length);
break;
case 'asal':
mqtt_publish("album", data, length);
break;
case 'minm':
mqtt_publish("title", data, length);
break;
case 'asgn':
mqtt_publish("genre", data, length);
break;
case 'asfm':
mqtt_publish("format", data, length);
break;
}
}else if(type=='ssnc'){
switch (code) {
case 'asal':
mqtt_publish("songalbum", data, length);
break;
case 'pvol':
mqtt_publish("volume", data, length);
break;
case 'clip':
mqtt_publish("client_ip", data, length);
break;
case 'pbeg':
mqtt_publish("play_start", data, length);
break;
case 'pend':
mqtt_publish("play_end", data, length);
break;
case 'pfls':
mqtt_publish("play_flush", data, length);
break;
case 'prsm':
mqtt_publish("play_resume", data, length);
break;
case 'PICT':
if(config.mqtt_publish_parsed){
mqtt_publish("cover", data, length);
}
break;
}
}
}

return;
}


int initialise_mqtt() {
debug(1, "Initialising MQTT");
if(config.mqtt_hostname==NULL){
debug(1, "[MQTT]: Not initialized, as the hostname is not set");
return 0;
}
int keepalive = 60;
mosquitto_lib_init();
if( !(mosq = mosquitto_new(config.service_name, true, NULL)) ){
die("[MQTT]: FATAL: Could not create mosquitto object! %d\n", mosq);
}

if(
config.mqtt_cafile != NULL ||
config.mqtt_capath != NULL ||
config.mqtt_certfile != NULL ||
config.mqtt_keyfile != NULL
){
if(mosquitto_tls_set(mosq,config.mqtt_cafile, config.mqtt_capath, config.mqtt_certfile, config.mqtt_keyfile, NULL) != MOSQ_ERR_SUCCESS) {
die("[MQTT]: TLS Setup failed");
}
}

if(
config.mqtt_username != NULL ||
config.mqtt_password != NULL
){
if(mosquitto_username_pw_set(mosq,config.mqtt_username,config.mqtt_password) != MOSQ_ERR_SUCCESS) {
die("[MQTT]: Username/Password set failed");
}
}
mosquitto_log_callback_set(mosq, _cb_log);

if(config.mqtt_enable_remote){
mosquitto_message_callback_set(mosq, on_message);
}

mosquitto_disconnect_callback_set(mosq, on_disconnect);
mosquitto_connect_callback_set(mosq, on_connect);
if(mosquitto_connect(mosq, config.mqtt_hostname, config.mqtt_port, keepalive)){
inform("[MQTT]: Could not establish a mqtt connection");
}
if(mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS){
inform("[MQTT]: Could start MQTT Main loop");
}

return 0;
}
15 changes: 15 additions & 0 deletions mqtt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef MQTT_H
#define MQTT_H
#include <stdint.h>
#include <mosquitto.h>


int initialise_mqtt();
void mqtt_process_metadata(uint32_t type, uint32_t code, char *data, uint32_t length);
void mqtt_publish(char* topic, char* data, uint32_t length);
void mqtt_setup();
void on_connect(struct mosquitto* mosq, void* userdata, int rc);
void on_disconnect(struct mosquitto* mosq, void* userdata, int rc);
void on_message(struct mosquitto* mosq, void* userdata, const struct mosquitto_message* msg);
void _cb_log(struct mosquitto *mosq, void *userdata, int level, const char *str);
#endif /* #ifndef MQTT_H */
9 changes: 9 additions & 0 deletions rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
#include "metadata_hub.h"
#endif

#ifdef HAVE_MQTT
#include "mqtt.h"
#endif

#ifdef AF_INET6
#define INETx_ADDRSTRLEN INET6_ADDRSTRLEN
#else
Expand Down Expand Up @@ -1240,6 +1244,11 @@ void *metadata_thread_function(__attribute__((unused)) void *ignore) {
metadata_process(pack.type, pack.code, pack.data, pack.length);
#ifdef HAVE_METADATA_HUB
metadata_hub_process_metadata(pack.type, pack.code, pack.data, pack.length);
#endif
#ifdef HAVE_MQTT
if(config.mqtt_enabled){
mqtt_process_metadata(pack.type, pack.code, pack.data, pack.length);
}
#endif
}
if (pack.carrier)
Expand Down
22 changes: 22 additions & 0 deletions scripts/shairport-sync.conf
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ metadata =
// socket_msglength = 65000; // the maximum packet size for any UDP metadata. This will be clipped to be between 500 or 65000. The default is 500.
};

// How to enable the MQTT-metadata/remote-service
mqtt =
{
// enabled = "no"; // set this to yes to enable the mqtt-metadata-service
// hostname = "iot.eclipse.org"; // Hostname of the MQTT Broker
// port = "1883";
// username = NULL; //set this to a string with your username, to enable username authentication
// password = NULL; //set this to a string with your password, to enable username & password authentication
// capath = NULL; //set this to the folder with the CA-Certificates to be accepted for the server certificate. If not set, TLS is not used
// cafile = NULL; //this may be used as an (exclusive) alternative to capath with a single file for all ca-certificates
// certfile = NULL; //set this to a string to a user certificate to enable MQTT Client certificates. keyfile must also be set!
// keyfile = NULL; //private key for MQTT Client authentication
// topic = NULL; //MQTT topic where this instance of shairport-sync should publish. If not set, the general.name value is used.
// publish_raw = "no"; //whether to publish all available metadata under the codes given in the 'metadata' docs.
// publish_parsed = "no"; //whether to publish a small (but useful) subset of metadata under human-understandable topics
// Currently published topics:artist,album,title,genre,format,songalbum,volume,client_ip,
// Additionally, empty messages at the topics play_start,play_end,play_flush,play_resume are published
// publish_cover = "no"; //whether to publish the cover over mqtt in binary form. This may lead to a bit of load on the broker
// enable_remote; //whether to remote control via MQTT. RC is available under `topic`/remote.
// Available commands are "command", "beginff", "beginrew", "mutetoggle", "nextitem", "previtem", "pause", "playpause", "play", "stop", "playresume", "shuffle_songs", "volumedown", "volumeup"
}

// Diagnostic settings. These are for diagnostic and debugging only. Normally you sould leave them commented out
diagnostics =
{
Expand Down
Loading

0 comments on commit 0269494

Please sign in to comment.