Skip to content

Commit

Permalink
Merge branch 'main' into rti-DNET
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun authored Aug 30, 2024
2 parents 0fab3b6 + d419c9f commit 2bfd833
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 38 deletions.
103 changes: 65 additions & 38 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag
// federate that is far ahead of other upstream federates in logical time.
lf_update_max_level(_fed.last_TAG, _fed.is_last_TAG_provisional);
lf_cond_broadcast(&lf_port_status_changed);
lf_cond_broadcast(&env->event_q_changed);
} else {
// Message arrivals should be monotonic, so this should not occur.
lf_print_warning("Attempt to update the last known status tag "
Expand All @@ -292,6 +293,34 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag
}
}

/**
* @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER.
*
* This does nothing if the federate is not using decentralized coordination.
* This function acquires the mutex on the top-level environment.
* @param fed_id The ID of the federate.
*/
static void mark_inputs_known_absent(int fed_id) {
#ifdef FEDERATED_DECENTRALIZED
// Note that when transient federates are supported, this will need to be updated because the
// federate could rejoin.
environment_t* env;
_lf_get_environments(&env);
LF_MUTEX_LOCK(&env->mutex);

for (size_t i = 0; i < _lf_action_table_size; i++) {
lf_action_base_t* action = _lf_action_table[i];
if (action->source_id == fed_id) {
update_last_known_status_on_input_port(env, FOREVER_TAG, i);
}
}
LF_MUTEX_UNLOCK(&env->mutex);
#else
// Do nothing, except suppress unused parameter error.
(void)fed_id;
#endif // FEDERATED_DECENTRALIZED
}

/**
* Set the status of network port with id portID.
*
Expand Down Expand Up @@ -735,46 +764,46 @@ static void* listen_to_federates(void* _args) {
bool socket_closed = false;
// Read one byte to get the message type.
LF_PRINT_DEBUG("Waiting for a P2P message on socket %d.", *socket_id);
bool bad_message = false;
if (read_from_socket_close_on_error(socket_id, 1, buffer)) {
// Socket has been closed.
lf_print("Socket from federate %d is closed.", fed_id);
// Stop listening to this federate.
socket_closed = true;
break;
}
LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]);
bool bad_message = false;
switch (buffer[0]) {
case MSG_TYPE_P2P_MESSAGE:
LF_PRINT_LOG("Received untimed message from federate %d.", fed_id);
if (handle_message(socket_id, fed_id)) {
// Failed to complete the reading of a message on a physical connection.
lf_print_warning("Failed to complete reading of message on physical connection.");
socket_closed = true;
}
break;
case MSG_TYPE_P2P_TAGGED_MESSAGE:
LF_PRINT_LOG("Received tagged message from federate %d.", fed_id);
if (handle_tagged_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
case MSG_TYPE_PORT_ABSENT:
LF_PRINT_LOG("Received port absent message from federate %d.", fed_id);
if (handle_port_absent_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
} else {
LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]);
switch (buffer[0]) {
case MSG_TYPE_P2P_MESSAGE:
LF_PRINT_LOG("Received untimed message from federate %d.", fed_id);
if (handle_message(socket_id, fed_id)) {
// Failed to complete the reading of a message on a physical connection.
lf_print_warning("Failed to complete reading of message on physical connection.");
socket_closed = true;
}
break;
case MSG_TYPE_P2P_TAGGED_MESSAGE:
LF_PRINT_LOG("Received tagged message from federate %d.", fed_id);
if (handle_tagged_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
case MSG_TYPE_PORT_ABSENT:
LF_PRINT_LOG("Received port absent message from federate %d.", fed_id);
if (handle_port_absent_message(socket_id, fed_id)) {
// P2P tagged messages are only used in decentralized coordination, and
// it is not a fatal error if the socket is closed before the whole message is read.
// But this thread should exit.
lf_print_warning("Failed to complete reading of tagged message.");
socket_closed = true;
}
break;
default:
bad_message = true;
}
break;
default:
bad_message = true;
}
if (bad_message) {
lf_print_error("Received erroneous message type: %d. Closing the socket.", buffer[0]);
Expand All @@ -783,12 +812,10 @@ static void* listen_to_federates(void* _args) {
break; // while loop
}
if (socket_closed) {
// NOTE: For decentralized execution, once this socket is closed, we could
// For decentralized execution, once this socket is closed, we
// update last known tags of all ports connected to the specified federate to FOREVER_TAG,
// which would eliminate the need to wait for STAA to assume an input is absent.
// However, at this time, we don't know which ports correspond to which upstream federates.
// The code generator would have to encode this information. Once that is done,
// we could call update_last_known_status_on_input_port with FOREVER_TAG.
mark_inputs_known_absent(fed_id);

break; // while loop
}
Expand Down
1 change: 1 addition & 0 deletions include/core/lf_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ typedef struct {
trigger_t* trigger; // THIS HAS TO MATCH lf_action_internal_t
self_base_t* parent;
bool has_value;
int source_id; // Used only for federated network input actions.
} lf_action_base_t;

/**
Expand Down
1 change: 1 addition & 0 deletions python/include/python_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct {
lf_action_internal_t _base;
self_base_t* parent;
bool has_value;
int source_id;
PyObject* value;
FEDERATED_GENERIC_EXTENSION
} generic_action_instance_struct;
Expand Down

0 comments on commit 2bfd833

Please sign in to comment.