diff --git a/core/federated/federate.c b/core/federated/federate.c index 3d3ea2c71..e2b2e5e72 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -284,6 +284,7 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag // federate that is far ahead of other upstream federates in logical time. lf_update_max_level(_fed.last_TAG, _fed.is_last_TAG_provisional); lf_cond_broadcast(&lf_port_status_changed); + lf_cond_broadcast(&env->event_q_changed); } else { // Message arrivals should be monotonic, so this should not occur. lf_print_warning("Attempt to update the last known status tag " @@ -292,6 +293,34 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag } } +/** + * @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER. + * + * This does nothing if the federate is not using decentralized coordination. + * This function acquires the mutex on the top-level environment. + * @param fed_id The ID of the federate. + */ +static void mark_inputs_known_absent(int fed_id) { +#ifdef FEDERATED_DECENTRALIZED + // Note that when transient federates are supported, this will need to be updated because the + // federate could rejoin. + environment_t* env; + _lf_get_environments(&env); + LF_MUTEX_LOCK(&env->mutex); + + for (size_t i = 0; i < _lf_action_table_size; i++) { + lf_action_base_t* action = _lf_action_table[i]; + if (action->source_id == fed_id) { + update_last_known_status_on_input_port(env, FOREVER_TAG, i); + } + } + LF_MUTEX_UNLOCK(&env->mutex); +#else + // Do nothing, except suppress unused parameter error. + (void)fed_id; +#endif // FEDERATED_DECENTRALIZED +} + /** * Set the status of network port with id portID. * @@ -735,46 +764,46 @@ static void* listen_to_federates(void* _args) { bool socket_closed = false; // Read one byte to get the message type. LF_PRINT_DEBUG("Waiting for a P2P message on socket %d.", *socket_id); + bool bad_message = false; if (read_from_socket_close_on_error(socket_id, 1, buffer)) { // Socket has been closed. lf_print("Socket from federate %d is closed.", fed_id); // Stop listening to this federate. socket_closed = true; - break; - } - LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]); - bool bad_message = false; - switch (buffer[0]) { - case MSG_TYPE_P2P_MESSAGE: - LF_PRINT_LOG("Received untimed message from federate %d.", fed_id); - if (handle_message(socket_id, fed_id)) { - // Failed to complete the reading of a message on a physical connection. - lf_print_warning("Failed to complete reading of message on physical connection."); - socket_closed = true; - } - break; - case MSG_TYPE_P2P_TAGGED_MESSAGE: - LF_PRINT_LOG("Received tagged message from federate %d.", fed_id); - if (handle_tagged_message(socket_id, fed_id)) { - // P2P tagged messages are only used in decentralized coordination, and - // it is not a fatal error if the socket is closed before the whole message is read. - // But this thread should exit. - lf_print_warning("Failed to complete reading of tagged message."); - socket_closed = true; - } - break; - case MSG_TYPE_PORT_ABSENT: - LF_PRINT_LOG("Received port absent message from federate %d.", fed_id); - if (handle_port_absent_message(socket_id, fed_id)) { - // P2P tagged messages are only used in decentralized coordination, and - // it is not a fatal error if the socket is closed before the whole message is read. - // But this thread should exit. - lf_print_warning("Failed to complete reading of tagged message."); - socket_closed = true; + } else { + LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]); + switch (buffer[0]) { + case MSG_TYPE_P2P_MESSAGE: + LF_PRINT_LOG("Received untimed message from federate %d.", fed_id); + if (handle_message(socket_id, fed_id)) { + // Failed to complete the reading of a message on a physical connection. + lf_print_warning("Failed to complete reading of message on physical connection."); + socket_closed = true; + } + break; + case MSG_TYPE_P2P_TAGGED_MESSAGE: + LF_PRINT_LOG("Received tagged message from federate %d.", fed_id); + if (handle_tagged_message(socket_id, fed_id)) { + // P2P tagged messages are only used in decentralized coordination, and + // it is not a fatal error if the socket is closed before the whole message is read. + // But this thread should exit. + lf_print_warning("Failed to complete reading of tagged message."); + socket_closed = true; + } + break; + case MSG_TYPE_PORT_ABSENT: + LF_PRINT_LOG("Received port absent message from federate %d.", fed_id); + if (handle_port_absent_message(socket_id, fed_id)) { + // P2P tagged messages are only used in decentralized coordination, and + // it is not a fatal error if the socket is closed before the whole message is read. + // But this thread should exit. + lf_print_warning("Failed to complete reading of tagged message."); + socket_closed = true; + } + break; + default: + bad_message = true; } - break; - default: - bad_message = true; } if (bad_message) { lf_print_error("Received erroneous message type: %d. Closing the socket.", buffer[0]); @@ -783,12 +812,10 @@ static void* listen_to_federates(void* _args) { break; // while loop } if (socket_closed) { - // NOTE: For decentralized execution, once this socket is closed, we could + // For decentralized execution, once this socket is closed, we // update last known tags of all ports connected to the specified federate to FOREVER_TAG, // which would eliminate the need to wait for STAA to assume an input is absent. - // However, at this time, we don't know which ports correspond to which upstream federates. - // The code generator would have to encode this information. Once that is done, - // we could call update_last_known_status_on_input_port with FOREVER_TAG. + mark_inputs_known_absent(fed_id); break; // while loop } diff --git a/include/core/lf_types.h b/include/core/lf_types.h index a3a103041..b6d754ca2 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -298,6 +298,7 @@ typedef struct { trigger_t* trigger; // THIS HAS TO MATCH lf_action_internal_t self_base_t* parent; bool has_value; + int source_id; // Used only for federated network input actions. } lf_action_base_t; /** diff --git a/python/include/python_action.h b/python/include/python_action.h index 880bfe149..f0682f9b5 100644 --- a/python/include/python_action.h +++ b/python/include/python_action.h @@ -61,6 +61,7 @@ typedef struct { lf_action_internal_t _base; self_base_t* parent; bool has_value; + int source_id; PyObject* value; FEDERATED_GENERIC_EXTENSION } generic_action_instance_struct;