From 6afbb9dd0e953be65fd26d479ef6b0ccafc2d507 Mon Sep 17 00:00:00 2001 From: Richard Holme Date: Fri, 20 Aug 2021 17:51:41 +0100 Subject: [PATCH] MQTT Bug Fixes for subscriptions and subscribe-topic Bug Fixes: * MQTT Subscriptions don't work at startup (GH Issue #23) * MQTT subscribe-topic in CONNACK is not retrieved correctly * Unnecessary log message for object deletion subscription containing wildcard and partial path --- src/core/data_model.c | 4 +- src/core/device_mqtt.c | 86 ++++++++++++++++++------ src/core/device_subscription.c | 1 + src/core/dm_exec.c | 2 - src/core/dm_inst_vector.c | 22 +++++- src/core/dm_inst_vector.h | 1 + src/core/handle_get_supported_dm.c | 4 +- src/core/handle_get_supported_protocol.c | 2 +- src/core/mqtt.c | 77 ++++++++++++++------- src/core/path_resolver.c | 84 ++++++++++++++++++++--- 10 files changed, 218 insertions(+), 65 deletions(-) mode change 100755 => 100644 src/core/data_model.c mode change 100755 => 100644 src/core/device_subscription.c mode change 100755 => 100644 src/core/dm_inst_vector.c mode change 100755 => 100644 src/core/dm_inst_vector.h mode change 100755 => 100644 src/core/handle_get_supported_dm.c mode change 100755 => 100644 src/core/handle_get_supported_protocol.c mode change 100755 => 100644 src/core/mqtt.c diff --git a/src/core/data_model.c b/src/core/data_model.c old mode 100755 new mode 100644 index c883c93..b81de0c --- a/src/core/data_model.c +++ b/src/core/data_model.c @@ -2902,7 +2902,7 @@ int DM_PRIV_CalcHashFromPath(char *path, dm_instances_t *inst, dm_hash_t *p_hash ** \param path - full data model path of the parameter or object to return the node of ** This may be an instantiated or schema path or may use wildcards instead of instance numbers (if so, inst and is_qualified_instance must be NULL). ** \param inst - pointer to instances structure, filled in from parsing the path -** NOTE: This parameter may be NULL if instances are not required +** NOTE: This parameter may be NULL if instances are not required, but only if is_qualified_instance is NULL too ** ** \param is_qualified_instance - Pointer to boolean in which to return whether the instances ** structure contains all instance numbers for the parameter/object being addressed @@ -4053,7 +4053,6 @@ int AddChildParamsDefaultValues(char *path, int path_len, dm_node_t *node, dm_in ** NOTE: This function is recursive ** ** \param path - path of the object instance to delete children from. This code will modify the buffer pointed to by this path -** NOTE: path is only actually needed for debug and error reporting purposes ** \param path_len - length of path (position to append child node names) ** \param node - Node to delete children of ** \param inst - pointer to instance structure locating the parent node @@ -4148,7 +4147,6 @@ int DeleteChildParams(char *path, int path_len, dm_node_t *node, dm_instances_t ** NOTE: This function is recursive ** ** \param path - path of the object to delete children from. This code will modify the buffer pointed to by this path -** NOTE: path is only actually needed for debug and error reporting purposes ** \param path_len - length of path (position to append child node names) ** \param node - Node to delete children of ** \param inst - pointer to instance structure locating the parent node diff --git a/src/core/device_mqtt.c b/src/core/device_mqtt.c index 912bd15..f9c47ab 100644 --- a/src/core/device_mqtt.c +++ b/src/core/device_mqtt.c @@ -155,7 +155,7 @@ client_t *FindUnusedMqttClient(void); client_t *FindDevMqttClientByInstance(int instance); void DestroyMQTTClient(client_t *client); int ProcessMqttClientAdded(int instance); -int ProcessMqttSubscriptionAdded(int instance, int sub_instance); +int ProcessMqttSubscriptionAdded(int instance, int sub_instance, mqtt_subscription_t **mqtt_sub); int DEVICE_MQTT_StartAllClients(void); int EnableMQTTClient(mqtt_conn_params_t *mp, mqtt_subscription_t subscriptions[MAX_MQTT_SUBSCRIPTIONS]); void ScheduleMqttReconnect(mqtt_conn_params_t *mp); @@ -605,10 +605,14 @@ int ProcessMqttClientAdded(int instance) { int err; char path[MAX_DM_PATH]; - + int_vector_t iv; client_t *mqttclient; - mqttclient = FindUnusedMqttClient(); + int i; + int mqtt_subs_inst; + // Initialise to defaults + INT_VECTOR_Init(&iv); + mqttclient = FindUnusedMqttClient(); if (mqttclient == NULL) { return USP_ERR_RESOURCES_EXCEEDED; @@ -915,10 +919,38 @@ int ProcessMqttClientAdded(int instance) goto exit; } + // Exit if unable to get the object instance numbers present in the mqtt client subscription table + USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.", device_mqtt_client_root, instance); + err = DATA_MODEL_GetInstances(path, &iv); + if (err != USP_ERR_OK) + { + goto exit; + } + + // Add all MQTT subscriptions + for (i=0; i < iv.num_entries; i++) + { + mqtt_subs_inst = iv.vector[i]; + err = ProcessMqttSubscriptionAdded(instance, mqtt_subs_inst, NULL); + if (err != USP_ERR_OK) + { + // Exit if unable to delete a MQTT subscription with bad parameters from the DB + USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d", device_mqtt_client_root, instance, mqtt_subs_inst); + USP_LOG_Warning("%s: Deleting %s as it contained invalid parameters.", __FUNCTION__, path); + err = DATA_MODEL_DeleteInstance(path, 0); + if (err != USP_ERR_OK) + { + goto exit; + } + } + } + // If the code gets here, then we successfully retrieved all data about the MQTT client err = USP_ERR_OK; exit: + INT_VECTOR_Destroy(&iv); + if (err != USP_ERR_OK) { DestroyMQTTClient(mqttclient); @@ -961,64 +993,71 @@ int EnableMQTTClient(mqtt_conn_params_t *mp, mqtt_subscription_t subscriptions[M ** ** ProcessMqttSubscriptionAdded * -** Reads the parameters for the specified Mqtt Client subscriotion from the -** database and processes them +** Reads the parameters for the specified MQTT client subscription from the +** database and caches them in the local mqtt_client_params[] +** NOTE: Does not propagate the change to the underlying MTP (this must be performed by the caller by calling MQTT_AddSubscription) ** ** \param instance - instance number of the MQTT client ** \param sub_instance - instance number of the MQTT Subscription +** \param mqtt_sub - pointer to variable in which to return a pointer to the MQTT subscription entry allocated by this function +** NOTE: This parameter may be NULL, if the caller is not interested in this value ** ** \return USP_ERR_OK if successful ** **************************************************************************/ -int ProcessMqttSubscriptionAdded(int instance, int sub_instance) +int ProcessMqttSubscriptionAdded(int instance, int sub_instance, mqtt_subscription_t **mqtt_sub) { int err = USP_ERR_OK; char path[MAX_DM_PATH]; client_t *client = NULL; + mqtt_subscription_t *sub; - // Refactor. + // Default return parameters + if (mqtt_sub != NULL) + { + *mqtt_sub = NULL; + } + + // Exit if unable to allocate a subscription entry for this MQTT client client = FindDevMqttClientByInstance(instance); USP_ASSERT(client != NULL); - - mqtt_subscription_t* sub = FindUnusedSubscriptionInMqttClient(client); + sub = FindUnusedSubscriptionInMqttClient(client); if (sub == NULL) { USP_LOG_Error("%s: Failed to find empty subscription.", __FUNCTION__); return USP_ERR_RESOURCES_EXCEEDED; } - sub->instance = sub_instance; + // Exit if unable to retrieve the Enable parameter for this MQTT subscription USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d.Enable", device_mqtt_client_root, instance, sub_instance); err = DM_ACCESS_GetBool(path, &sub->enabled); if (err != USP_ERR_OK) { - USP_ERR_SetMessage("%s: Client Subscription %d enable failed", __FUNCTION__, sub_instance); return err; } + // Exit if unable to retrieve the Topic parameter for this MQTT subscription USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d.Topic", device_mqtt_client_root, instance, sub_instance); USP_SAFE_FREE(sub->topic); err = DM_ACCESS_GetString(path, &sub->topic); if (err != USP_ERR_OK) { - USP_ERR_SetMessage("%s: Client Subscription %d Topic failed", __FUNCTION__, sub_instance); return err; } + // Exit if unable to retrieve the QoS parameter for this MQTT subscription USP_SNPRINTF(path, sizeof(path), "%s.%d.Subscription.%d.QoS", device_mqtt_client_root, instance, sub_instance); err = DM_ACCESS_GetUnsigned(path, &sub->qos); if (err != USP_ERR_OK) { - USP_ERR_SetMessage("%s: Client Subscription %d QoS failed", __FUNCTION__, sub_instance); return err; } - err = MQTT_AddSubscription(instance, sub); - if (err != USP_ERR_OK) + // Since the subscription was retrieved successfully, return the subscription entry + if (mqtt_sub != NULL) { - USP_ERR_SetMessage("%s: client subscribe failed\n", __FUNCTION__); - return err; + *mqtt_sub = sub; } return err; @@ -2781,7 +2820,6 @@ int ValidateAdd_MqttClientSubscriptions(dm_req_t *req) return USP_ERR_OK; } - /*********************************************************************//** ** ** Notify_MqttClientSubcriptionsAdded @@ -2798,22 +2836,30 @@ int Notify_MqttClientSubcriptionsAdded(dm_req_t *req) { int err; client_t *mqttclient; + mqtt_subscription_t *sub; mqttclient = FindDevMqttClientByInstance(inst1); USP_ASSERT(mqttclient != NULL); // As we had just successfully added it // Exit if failed to copy from DB into mqtt client array - err = ProcessMqttSubscriptionAdded(inst1, inst2); + err = ProcessMqttSubscriptionAdded(inst1, inst2, &sub); if (err != USP_ERR_OK) { USP_ERR_SetMessage(" %s: Process MQTT client added failed\n", __FUNCTION__); return err; } + // Exit if unable to propagate the subscription to the MQTT MTP + err = MQTT_AddSubscription(inst1, sub); + if (err != USP_ERR_OK) + { + USP_ERR_SetMessage("%s: client subscribe failed\n", __FUNCTION__); + return err; + } + return USP_ERR_OK; } - /*********************************************************************//** ** ** Notify_MqttClientSubscriptionsDeleted diff --git a/src/core/device_subscription.c b/src/core/device_subscription.c old mode 100755 new mode 100644 index a9008f6..f8fa40c --- a/src/core/device_subscription.c +++ b/src/core/device_subscription.c @@ -2358,6 +2358,7 @@ void RefreshInstancesForObjLifetimeSubscriptions(void) // Simply resolving the path expressions for ObjectCreation/Deletion subscriptions will result in the refresh_instances callback being called if necessary // (because the path resolver checks whether the instance being resolved exists, or gets the instances to resolve a wildcard etc) + // And when instances are refreshed, the code automatically determines if any have been added or deleted for (i=0; i < subscriptions.num_entries; i++) { sub = &subscriptions.vector[i]; diff --git a/src/core/dm_exec.c b/src/core/dm_exec.c index dc7694d..15df491 100644 --- a/src/core/dm_exec.c +++ b/src/core/dm_exec.c @@ -895,8 +895,6 @@ void *DM_EXEC_Main(void *args) return NULL; break; - break; - case 0: // No controllers with any activity, but we still may need to process a timeout, so fall-through default: diff --git a/src/core/dm_inst_vector.c b/src/core/dm_inst_vector.c old mode 100755 new mode 100644 index 2b3362d..ccb3d17 --- a/src/core/dm_inst_vector.c +++ b/src/core/dm_inst_vector.c @@ -243,7 +243,7 @@ int DM_INST_VECTOR_IsExist(dm_instances_t *match, bool *exists) dm_instances_vector_t *div; int err; - // Exit if the object is a single instance object - these always exist + // Exit if the object is a single instance object or an unqualified multi-instance object - these always exist if (match->order == 0) { *exists = true; @@ -731,6 +731,24 @@ int DM_INST_VECTOR_RefreshInstance(char *path) return USP_ERR_OK; } +/*********************************************************************//** +** +** DM_INST_VECTOR_RefreshTopLevelObjectInstances +** +** Refreshes the instances for the specified top level object and all children +** NOTE: This function may be called recursively, since it is called from the path resolver and it may call +** the path resolver itself in order to resolve object lifetime event subscriptions +** +** \param node - pointer to node of top level object to refresh +** +** \return USP_ERR_OK if successful +** +**************************************************************************/ +int DM_INST_VECTOR_RefreshTopLevelObjectInstances(dm_node_t *node) +{ + return RefreshInstVector(node, true); +} + /*********************************************************************//** ** ** AddObjectInstanceIfPermitted @@ -885,7 +903,7 @@ int RefreshInstVector(dm_node_t *top_node, bool notify_subscriptions) } // NOTE: We need to call DEVICE_SUBSCRIPTION_ResolveObjectDeletionPaths() before DEVICE_SUBSCRIPTION_NotifyObjectLifeEvent() - // But as that is an unnecessary (and costly) operation if no instances are deleted, we only do if if there were any instances that were deleted + // But as that is an unnecessary (and costly) operation if no instances are deleted, we only do it if there were any instances that were deleted if (deleted_instances.num_entries > 0) { // NOTE: DEVICE_SUBSCRIPTION_ResolveObjectDeletionPaths() must be called before any objects are actually deleted from the data model diff --git a/src/core/dm_inst_vector.h b/src/core/dm_inst_vector.h old mode 100755 new mode 100644 index 9894936..b3ef1f1 --- a/src/core/dm_inst_vector.h +++ b/src/core/dm_inst_vector.h @@ -58,5 +58,6 @@ int DM_INST_VECTOR_GetAllInstancePaths_Qualified(dm_instances_t *inst, str_vecto void DM_INST_VECTOR_RefreshBaselineInstances(dm_node_t *parent); void DM_INST_VECTOR_Dump(dm_instances_vector_t *div); int DM_INST_VECTOR_RefreshInstance(char *path); +int DM_INST_VECTOR_RefreshTopLevelObjectInstances(dm_node_t *node); #endif diff --git a/src/core/handle_get_supported_dm.c b/src/core/handle_get_supported_dm.c old mode 100755 new mode 100644 index 387d7de..d723f9b --- a/src/core/handle_get_supported_dm.c +++ b/src/core/handle_get_supported_dm.c @@ -1,7 +1,7 @@ /* * - * Copyright (C) 2019-2020, Broadband Forum - * Copyright (C) 2016-2020 CommScope, Inc + * Copyright (C) 2019-2021, Broadband Forum + * Copyright (C) 2016-2021 CommScope, Inc * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions diff --git a/src/core/handle_get_supported_protocol.c b/src/core/handle_get_supported_protocol.c old mode 100755 new mode 100644 index bbda980..dd174c3 --- a/src/core/handle_get_supported_protocol.c +++ b/src/core/handle_get_supported_protocol.c @@ -1,6 +1,6 @@ /* * - * Copyright (C) 2019, Broadband Forum + * Copyright (C) 2019-2020, Broadband Forum * Copyright (C) 2016-2019 CommScope, Inc * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/mqtt.c b/src/core/mqtt.c old mode 100755 new mode 100644 index 6e57904..63129e1 --- a/src/core/mqtt.c +++ b/src/core/mqtt.c @@ -1193,40 +1193,66 @@ void ConnectV5Callback(struct mosquitto *mosq, void *userdata, int result, int f USP_LOG_Error("%s: No cert chain, so cannot get controller trust", __FUNCTION__); } - // Pick up client id, as per R-MQTT.9 - // Done as arrays on the stack here so we don't have _even_ more to free from the heap - char client_id[512] = { 0 }; - char *client_id_ptr = client_id; - char response_info[512] = { 0 }; - char *response_info_ptr = response_info; - char subscribe_topic[512] = { 0 }; - char *subscribe_topic_ptr = subscribe_topic; - - mosquitto_property_read_string(props, ASSIGNED_CLIENT_IDENTIFIER, - &client_id_ptr, false /* skip first */); - - mosquitto_property_read_string(props, RESPONSE_INFORMATION, - &response_info_ptr, false); - - char* name = "subscribe-topic"; - if (mosquitto_property_read_string_pair(props, USER_PROPERTY, - &name, &subscribe_topic_ptr, false) != MOSQ_ERR_SUCCESS) + char *client_id_ptr = NULL; + char *response_info_ptr = NULL; + char *subscribe_topic_ptr = NULL; + + if (mosquitto_property_read_string(props, ASSIGNED_CLIENT_IDENTIFIER, + &client_id_ptr, false /* skip first */) != NULL) { - USP_LOG_Error("Couldn't find subscribe-topic in user properties"); + USP_LOG_Debug("Received client_id: \"%s\"", client_id_ptr); + USP_SAFE_FREE(client->conn_params.client_id); + client->conn_params.client_id = USP_STRDUP(client_id_ptr); + free(client_id_ptr); } - USP_LOG_Debug("Received subcribe-topic: \"%s\"", subscribe_topic); - - if (strlen(response_info_ptr) > 0) + if (mosquitto_property_read_string(props, RESPONSE_INFORMATION, + &response_info_ptr, false) != NULL) { // Then replace the response_topic in subscription with this USP_SAFE_FREE(client->response_subscription.topic); + USP_LOG_Debug("Received response_info: \"%s\"", response_info_ptr); client->response_subscription.topic = USP_STRDUP(response_info_ptr); + free(response_info_ptr); + } + else + { + // if no response information, check if it's in the subscribe-topic user prop + char* userPropName; + if (mosquitto_property_read_string_pair(props, USER_PROPERTY, + &userPropName, &subscribe_topic_ptr, false) != NULL) + { + // we only want subscribe-topic user property + if (strcmp("subscribe-topic", userPropName) == 0) + { + USP_LOG_Debug("Received subcribe-topic: \"%s\"", subscribe_topic_ptr); + USP_SAFE_FREE(client->response_subscription.topic); + client->response_subscription.topic = USP_STRDUP(subscribe_topic_ptr); + free(subscribe_topic_ptr); + free(userPropName); + } + else + { + // it wasn't in the 1st one, try the next one, set skip 1st to true + free(subscribe_topic_ptr); + free(userPropName); + if (mosquitto_property_read_string_pair(props, USER_PROPERTY, + &userPropName, &subscribe_topic_ptr, true) != NULL) + { + // we only want subscribe-topic user property + if (strcmp("subscribe-topic", userPropName) == 0) + { + USP_LOG_Debug("Received subcribe-topic: \"%s\"", subscribe_topic_ptr); + USP_SAFE_FREE(client->response_subscription.topic); + client->response_subscription.topic = USP_STRDUP(subscribe_topic_ptr); + } + free(subscribe_topic_ptr); + free(userPropName); + } + } + } } - - USP_SAFE_FREE(client->conn_params.client_id); - client->conn_params.client_id = USP_STRDUP(client_id_ptr); USP_LOG_Debug("Received client id \"%s\"", client->conn_params.client_id); ResetRetryCount(client); @@ -1552,6 +1578,7 @@ void DisconnectCallback(struct mosquitto *mosq, void *userdata, int rc) { if (client->state != kMqttState_ErrorRetrying) { + USP_LOG_Debug("DisconnectCallback rc is %d\n", rc); HandleMqttError(client, kMqttFailure_OtherError, "Force disconnected from broker"); } } diff --git a/src/core/path_resolver.c b/src/core/path_resolver.c index 8141470..aa6979f 100644 --- a/src/core/path_resolver.c +++ b/src/core/path_resolver.c @@ -111,6 +111,8 @@ int ResolveIntermediateReferences(str_vector_t *params, resolver_state_t *state, bool GroupReferencedParameters(str_vector_t *params, resolver_state_t *state, int_vector_t *perm, group_get_vector_t *ggv, int *err); void InitSearchParam(search_param_t *sp); void DestroySearchParam(search_param_t *sp); +void RefreshInstances_LifecycleSubscriptionEndingInPartialPath(char *path); + /*********************************************************************//** ** ** PATH_RESOLVER_ResolveDevicePath @@ -268,6 +270,7 @@ int ExpandPath(char *resolved, char *unresolved, resolver_state_t *state) int len; int err; char c; + bool check_refresh_instances = false; // Exit if path is too long len = strlen(resolved); @@ -343,16 +346,9 @@ int ExpandPath(char *resolved, char *unresolved, resolver_state_t *state) case kResolveOp_SubsAdd: case kResolveOp_SubsDel: - { - // Partial path for add/delete object subscriptions must ensure that object instances are refreshed - // Do this by getting the instances for this object (all sub objects are also refreshed in the process) - int_vector_t iv; - - resolved[len-1] = '\0'; - INT_VECTOR_Init(&iv); - DATA_MODEL_GetInstances(resolved, &iv); // Intentionally ignoring any errors - INT_VECTOR_Destroy(&iv); - } + // Remove any trailing '.' The partial path may potentially call a refresh instances vendor hook to be called + resolved[len-1] = '\0'; + check_refresh_instances = true; break; case kResolveOp_Add: @@ -381,9 +377,77 @@ int ExpandPath(char *resolved, char *unresolved, resolver_state_t *state) return err; } + // Partial path for add/delete object subscriptions must ensure that object instances are refreshed + // Do this by getting the instances for this object (all sub objects are also refreshed in the process) + if (check_refresh_instances) + { + RefreshInstances_LifecycleSubscriptionEndingInPartialPath(resolved); + } + return USP_ERR_OK; } +/*********************************************************************//** +** +** RefreshInstances_LifecycleSubscriptionEndingInPartialPath +** +** Refreshes the instance numbers of a top level object referenced by an object lifetime subscription +** +** The code in RefreshInstancesForObjLifetimeSubscriptions() periodically +** refreshes all instances which have object lifetime subscriptions on them +** in order to determine whether the subscription should fire. +** This function is called if the ReferenceList of the subscription is a partial path. +** It ensures that the refresh instances vendor hook is called, if it wouldn't have been +** already during path resolution. The only time it wouldn't have been called is if the +** path resolver resolves to a partial path of a top level multi-instance object +** +** \param path - path of the object to potentially refresh +** +** \return None +** +**************************************************************************/ +void RefreshInstances_LifecycleSubscriptionEndingInPartialPath(char *path) +{ + dm_node_t *node; + bool is_qualified_instance; + dm_object_info_t *info; + dm_instances_t inst; + + // Exit if unable to find node representing this object. NOTE: This should never occur, as caller should have ensured path exists in schema + node = DM_PRIV_GetNodeFromPath(path, &inst, &is_qualified_instance); + if (node == NULL) + { + return; + } + + // Exit if this is not a top level multi-instance object with a refresh instances vendor hook + // NOTE: If path is to a child object whose parent has a refresh instances vendor hook, + // then the vendor hook will already have been called as part of resolving the path, so no need to refresh here + // NOTE: The path resolver disallows object lifecycle subscriptions on partial paths that are not multi-instance objects + // so this code does not have to cope with calling the refresh instances vendor hook for a child object of the given path. + info = &node->registered.object_info; + if ((node->type != kDMNodeType_Object_MultiInstance) || (node->order != 1) || (info->refresh_instances_cb == NULL)) + { + return; + } + + // Exit if this object is already a fully qualified instance + // NOTE: This may be the case if the subscription ReferenceList terminated in wildcard or instance number before the partial path dot character + // If so, the refresh instances vendor hook would already have been called + if (is_qualified_instance) + { + return; + } + + // NOTE: This function may be called recursively if it is time to call the refresh instances vendor hook + // The first time DM_INST_VECTOR_RefreshTopLevelObjectInstances() is called, if it calls the refresh instances vendor hook, + // then afterwards it will determine if any of the instances caused the subscription to fire. + // It does this by calling the path resolver, which will end up in this function again. + // The second time that DM_INST_VECTOR_RefreshTopLevelObjectInstances() is called, the instances cache + // will not need refreshing, hence DM_INST_VECTOR_RefreshTopLevelObjectInstances() will return the second time it is called. + DM_INST_VECTOR_RefreshTopLevelObjectInstances(node); +} + /*********************************************************************//** ** ** ExpandWildcard