Skip to content

Commit

Permalink
MQTT Bug Fixes for subscriptions and subscribe-topic
Browse files Browse the repository at this point in the history
Bug Fixes:
* MQTT Subscriptions don't work at startup (GH Issue #23)
* MQTT subscribe-topic in CONNACK is not retrieved correctly
* Unnecessary log message for object deletion subscription containing
wildcard and partial path
  • Loading branch information
Richard Holme committed Aug 20, 2021
1 parent ad63934 commit 6afbb9d
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 65 deletions.
4 changes: 1 addition & 3 deletions src/core/data_model.c
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -2902,7 +2902,7 @@ int DM_PRIV_CalcHashFromPath(char *path, dm_instances_t *inst, dm_hash_t *p_hash
** \param path - full data model path of the parameter or object to return the node of
** This may be an instantiated or schema path or may use wildcards instead of instance numbers (if so, inst and is_qualified_instance must be NULL).
** \param inst - pointer to instances structure, filled in from parsing the path
** NOTE: This parameter may be NULL if instances are not required
** NOTE: This parameter may be NULL if instances are not required, but only if is_qualified_instance is NULL too
**
** \param is_qualified_instance - Pointer to boolean in which to return whether the instances
** structure contains all instance numbers for the parameter/object being addressed
Expand Down Expand Up @@ -4053,7 +4053,6 @@ int AddChildParamsDefaultValues(char *path, int path_len, dm_node_t *node, dm_in
** NOTE: This function is recursive
**
** \param path - path of the object instance to delete children from. This code will modify the buffer pointed to by this path
** NOTE: path is only actually needed for debug and error reporting purposes
** \param path_len - length of path (position to append child node names)
** \param node - Node to delete children of
** \param inst - pointer to instance structure locating the parent node
Expand Down Expand Up @@ -4148,7 +4147,6 @@ int DeleteChildParams(char *path, int path_len, dm_node_t *node, dm_instances_t
** NOTE: This function is recursive
**
** \param path - path of the object to delete children from. This code will modify the buffer pointed to by this path
** NOTE: path is only actually needed for debug and error reporting purposes
** \param path_len - length of path (position to append child node names)
** \param node - Node to delete children of
** \param inst - pointer to instance structure locating the parent node
Expand Down
86 changes: 66 additions & 20 deletions src/core/device_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ client_t *FindUnusedMqttClient(void);
client_t *FindDevMqttClientByInstance(int instance);
void DestroyMQTTClient(client_t *client);
int ProcessMqttClientAdded(int instance);
int ProcessMqttSubscriptionAdded(int instance, int sub_instance);
int ProcessMqttSubscriptionAdded(int instance, int sub_instance, mqtt_subscription_t **mqtt_sub);
int DEVICE_MQTT_StartAllClients(void);
int EnableMQTTClient(mqtt_conn_params_t *mp, mqtt_subscription_t subscriptions[MAX_MQTT_SUBSCRIPTIONS]);
void ScheduleMqttReconnect(mqtt_conn_params_t *mp);
Expand Down Expand Up @@ -605,10 +605,14 @@ int ProcessMqttClientAdded(int instance)
{
int err;
char path[MAX_DM_PATH];

int_vector_t iv;
client_t *mqttclient;
mqttclient = FindUnusedMqttClient();
int i;
int mqtt_subs_inst;

// Initialise to defaults
INT_VECTOR_Init(&iv);
mqttclient = FindUnusedMqttClient();
if (mqttclient == NULL)
{
return USP_ERR_RESOURCES_EXCEEDED;
Expand Down Expand Up @@ -915,10 +919,38 @@ int ProcessMqttClientAdded(int instance)
goto exit;
}

// Exit if unable to get the object instance numbers present in the mqtt client subscription table
USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.", device_mqtt_client_root, instance);
err = DATA_MODEL_GetInstances(path, &iv);
if (err != USP_ERR_OK)
{
goto exit;
}

// Add all MQTT subscriptions
for (i=0; i < iv.num_entries; i++)
{
mqtt_subs_inst = iv.vector[i];
err = ProcessMqttSubscriptionAdded(instance, mqtt_subs_inst, NULL);
if (err != USP_ERR_OK)
{
// Exit if unable to delete a MQTT subscription with bad parameters from the DB
USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d", device_mqtt_client_root, instance, mqtt_subs_inst);
USP_LOG_Warning("%s: Deleting %s as it contained invalid parameters.", __FUNCTION__, path);
err = DATA_MODEL_DeleteInstance(path, 0);
if (err != USP_ERR_OK)
{
goto exit;
}
}
}

// If the code gets here, then we successfully retrieved all data about the MQTT client
err = USP_ERR_OK;

exit:
INT_VECTOR_Destroy(&iv);

if (err != USP_ERR_OK)
{
DestroyMQTTClient(mqttclient);
Expand Down Expand Up @@ -961,64 +993,71 @@ int EnableMQTTClient(mqtt_conn_params_t *mp, mqtt_subscription_t subscriptions[M
**
** ProcessMqttSubscriptionAdded
*
** Reads the parameters for the specified Mqtt Client subscriotion from the
** database and processes them
** Reads the parameters for the specified MQTT client subscription from the
** database and caches them in the local mqtt_client_params[]
** NOTE: Does not propagate the change to the underlying MTP (this must be performed by the caller by calling MQTT_AddSubscription)
**
** \param instance - instance number of the MQTT client
** \param sub_instance - instance number of the MQTT Subscription
** \param mqtt_sub - pointer to variable in which to return a pointer to the MQTT subscription entry allocated by this function
** NOTE: This parameter may be NULL, if the caller is not interested in this value
**
** \return USP_ERR_OK if successful
**
**************************************************************************/
int ProcessMqttSubscriptionAdded(int instance, int sub_instance)
int ProcessMqttSubscriptionAdded(int instance, int sub_instance, mqtt_subscription_t **mqtt_sub)
{
int err = USP_ERR_OK;
char path[MAX_DM_PATH];
client_t *client = NULL;
mqtt_subscription_t *sub;

// Refactor.
// Default return parameters
if (mqtt_sub != NULL)
{
*mqtt_sub = NULL;
}

// Exit if unable to allocate a subscription entry for this MQTT client
client = FindDevMqttClientByInstance(instance);
USP_ASSERT(client != NULL);

mqtt_subscription_t* sub = FindUnusedSubscriptionInMqttClient(client);
sub = FindUnusedSubscriptionInMqttClient(client);
if (sub == NULL)
{
USP_LOG_Error("%s: Failed to find empty subscription.", __FUNCTION__);
return USP_ERR_RESOURCES_EXCEEDED;
}

sub->instance = sub_instance;

// Exit if unable to retrieve the Enable parameter for this MQTT subscription
USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d.Enable", device_mqtt_client_root, instance, sub_instance);
err = DM_ACCESS_GetBool(path, &sub->enabled);
if (err != USP_ERR_OK)
{
USP_ERR_SetMessage("%s: Client Subscription %d enable failed", __FUNCTION__, sub_instance);
return err;
}

// Exit if unable to retrieve the Topic parameter for this MQTT subscription
USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d.Topic", device_mqtt_client_root, instance, sub_instance);
USP_SAFE_FREE(sub->topic);
err = DM_ACCESS_GetString(path, &sub->topic);
if (err != USP_ERR_OK)
{
USP_ERR_SetMessage("%s: Client Subscription %d Topic failed", __FUNCTION__, sub_instance);
return err;
}

// Exit if unable to retrieve the QoS parameter for this MQTT subscription
USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d.QoS", device_mqtt_client_root, instance, sub_instance);
err = DM_ACCESS_GetUnsigned(path, &sub->qos);
if (err != USP_ERR_OK)
{
USP_ERR_SetMessage("%s: Client Subscription %d QoS failed", __FUNCTION__, sub_instance);
return err;
}

err = MQTT_AddSubscription(instance, sub);
if (err != USP_ERR_OK)
// Since the subscription was retrieved successfully, return the subscription entry
if (mqtt_sub != NULL)
{
USP_ERR_SetMessage("%s: client subscribe failed\n", __FUNCTION__);
return err;
*mqtt_sub = sub;
}

return err;
Expand Down Expand Up @@ -2781,7 +2820,6 @@ int ValidateAdd_MqttClientSubscriptions(dm_req_t *req)
return USP_ERR_OK;
}


/*********************************************************************//**
**
** Notify_MqttClientSubcriptionsAdded
Expand All @@ -2798,22 +2836,30 @@ int Notify_MqttClientSubcriptionsAdded(dm_req_t *req)
{
int err;
client_t *mqttclient;
mqtt_subscription_t *sub;

mqttclient = FindDevMqttClientByInstance(inst1);
USP_ASSERT(mqttclient != NULL); // As we had just successfully added it

// Exit if failed to copy from DB into mqtt client array
err = ProcessMqttSubscriptionAdded(inst1, inst2);
err = ProcessMqttSubscriptionAdded(inst1, inst2, &sub);
if (err != USP_ERR_OK)
{
USP_ERR_SetMessage(" %s: Process MQTT client added failed\n", __FUNCTION__);
return err;
}

// Exit if unable to propagate the subscription to the MQTT MTP
err = MQTT_AddSubscription(inst1, sub);
if (err != USP_ERR_OK)
{
USP_ERR_SetMessage("%s: client subscribe failed\n", __FUNCTION__);
return err;
}

return USP_ERR_OK;
}


/*********************************************************************//**
**
** Notify_MqttClientSubscriptionsDeleted
Expand Down
1 change: 1 addition & 0 deletions src/core/device_subscription.c
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -2358,6 +2358,7 @@ void RefreshInstancesForObjLifetimeSubscriptions(void)

// Simply resolving the path expressions for ObjectCreation/Deletion subscriptions will result in the refresh_instances callback being called if necessary
// (because the path resolver checks whether the instance being resolved exists, or gets the instances to resolve a wildcard etc)
// And when instances are refreshed, the code automatically determines if any have been added or deleted
for (i=0; i < subscriptions.num_entries; i++)
{
sub = &subscriptions.vector[i];
Expand Down
2 changes: 0 additions & 2 deletions src/core/dm_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,6 @@ void *DM_EXEC_Main(void *args)
return NULL;
break;

break;

case 0:
// No controllers with any activity, but we still may need to process a timeout, so fall-through
default:
Expand Down
22 changes: 20 additions & 2 deletions src/core/dm_inst_vector.c
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ int DM_INST_VECTOR_IsExist(dm_instances_t *match, bool *exists)
dm_instances_vector_t *div;
int err;

// Exit if the object is a single instance object - these always exist
// Exit if the object is a single instance object or an unqualified multi-instance object - these always exist
if (match->order == 0)
{
*exists = true;
Expand Down Expand Up @@ -731,6 +731,24 @@ int DM_INST_VECTOR_RefreshInstance(char *path)
return USP_ERR_OK;
}

/*********************************************************************//**
**
** DM_INST_VECTOR_RefreshTopLevelObjectInstances
**
** Refreshes the instances for the specified top level object and all children
** NOTE: This function may be called recursively, since it is called from the path resolver and it may call
** the path resolver itself in order to resolve object lifetime event subscriptions
**
** \param node - pointer to node of top level object to refresh
**
** \return USP_ERR_OK if successful
**
**************************************************************************/
int DM_INST_VECTOR_RefreshTopLevelObjectInstances(dm_node_t *node)
{
return RefreshInstVector(node, true);
}

/*********************************************************************//**
**
** AddObjectInstanceIfPermitted
Expand Down Expand Up @@ -885,7 +903,7 @@ int RefreshInstVector(dm_node_t *top_node, bool notify_subscriptions)
}

// NOTE: We need to call DEVICE_SUBSCRIPTION_ResolveObjectDeletionPaths() before DEVICE_SUBSCRIPTION_NotifyObjectLifeEvent()
// But as that is an unnecessary (and costly) operation if no instances are deleted, we only do if if there were any instances that were deleted
// But as that is an unnecessary (and costly) operation if no instances are deleted, we only do it if there were any instances that were deleted
if (deleted_instances.num_entries > 0)
{
// NOTE: DEVICE_SUBSCRIPTION_ResolveObjectDeletionPaths() must be called before any objects are actually deleted from the data model
Expand Down
1 change: 1 addition & 0 deletions src/core/dm_inst_vector.h
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ int DM_INST_VECTOR_GetAllInstancePaths_Qualified(dm_instances_t *inst, str_vecto
void DM_INST_VECTOR_RefreshBaselineInstances(dm_node_t *parent);
void DM_INST_VECTOR_Dump(dm_instances_vector_t *div);
int DM_INST_VECTOR_RefreshInstance(char *path);
int DM_INST_VECTOR_RefreshTopLevelObjectInstances(dm_node_t *node);

#endif
4 changes: 2 additions & 2 deletions src/core/handle_get_supported_dm.c
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
*
* Copyright (C) 2019-2020, Broadband Forum
* Copyright (C) 2016-2020 CommScope, Inc
* Copyright (C) 2019-2021, Broadband Forum
* Copyright (C) 2016-2021 CommScope, Inc
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
Expand Down
2 changes: 1 addition & 1 deletion src/core/handle_get_supported_protocol.c
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (C) 2019, Broadband Forum
* Copyright (C) 2019-2020, Broadband Forum
* Copyright (C) 2016-2019 CommScope, Inc
*
* Redistribution and use in source and binary forms, with or without
Expand Down
Loading

0 comments on commit 6afbb9d

Please sign in to comment.