Skip to content

Commit

Permalink
Merge branch 'main' into pico-support
Browse files Browse the repository at this point in the history
  • Loading branch information
gundralaa committed Jul 19, 2023
2 parents bd704be + df0f4a5 commit ffc5b07
Show file tree
Hide file tree
Showing 20 changed files with 644 additions and 910 deletions.
1 change: 0 additions & 1 deletion core/federated/RTI/enclave.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ void initialize_enclave(enclave_t* e, uint16_t id) {
e->downstream = NULL;
e->num_downstream = 0;
e->mode = REALTIME;
e->requested_stop = false;

// Initialize the next event condition variable.
lf_cond_init(&e->next_event_condition, &rti_mutex);
Expand Down
16 changes: 13 additions & 3 deletions core/federated/RTI/enclave.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* @file
* @author Edward A. Lee (eal@berkeley.edu)
* @author Soroush Bateni (soroush@utdallas.edu)
* @author Erling Jellum (erling.r.jellum@ntnu.no)
* @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn)
* @copyright (c) 2020-2023, The University of California at Berkeley
* License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md)
* @brief Declarations for runtime infrastructure (RTI) for scheduling enclaves and distributed Lingua Franca programs.
* This file declares RTI features that are used by scheduling enclaves as well as federated
* LF programs.
*/

#ifndef ENCLAVE_H
#define ENCLAVE_H

Expand Down Expand Up @@ -46,9 +59,6 @@ typedef struct enclave_t {
int* downstream; // Array of downstream federate ids.
int num_downstream; // Size of the array of downstream federates.
execution_mode_t mode; // FAST or REALTIME.
bool requested_stop; // Indicates that the federate has requested stop or has replied
// to a request for stop from the RTI. Used to prevent double-counting
// a federate when handling lf_request_stop().
lf_cond_t next_event_condition; // Condition variable used by enclaves to notify an enclave
// that it's call to next_event_tag() should unblock.
} enclave_t;
Expand Down
168 changes: 86 additions & 82 deletions core/federated/RTI/rti_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,11 @@ lf_mutex_t rti_mutex;
lf_cond_t received_start_times;
lf_cond_t sent_start_time;

/**
* Enter a critical section where logical time and the event queue are guaranteed
* to not change unless they are changed within the critical section.
* this can be implemented by disabling interrupts.
* Users of this function must ensure that lf_init_critical_sections() is
* called first and that lf_critical_section_exit() is called later.
* @return 0 on success, platform-specific error number otherwise.
*/
extern int lf_critical_section_enter() {
extern int lf_critical_section_enter(environment_t* env) {
return lf_mutex_lock(&rti_mutex);
}

/**
* Exit the critical section entered with lf_lock_time().
* @return 0 on success, platform-specific error number otherwise.
*/
extern int lf_critical_section_exit() {
extern int lf_critical_section_exit(environment_t* env) {
return lf_mutex_unlock(&rti_mutex);
}

Expand Down Expand Up @@ -204,7 +192,6 @@ void notify_tag_advance_grant(enclave_t* e, tag_t tag) {
if (bytes_written < 0) {
e->state = NOT_CONNECTED;
// FIXME: We need better error handling, but don't stop other execution here.
// mark_federate_requesting_stop(fed);
}
} else {
e->last_granted = tag;
Expand Down Expand Up @@ -245,7 +232,6 @@ void notify_provisional_tag_advance_grant(enclave_t* e, tag_t tag) {
if (bytes_written < 0) {
e->state = NOT_CONNECTED;
// FIXME: We need better error handling, but don't stop other execution here.
// mark_federate_requesting_stop(fed);
}
} else {
e->last_provisionally_granted = tag;
Expand Down Expand Up @@ -308,7 +294,7 @@ void handle_port_absent_message(federate_t* sending_federate, unsigned char* buf
tag_t tag = extract_tag(&(buffer[1 + 2 * sizeof(uint16_t)]));

if (_f_rti->tracing_enabled) {
tracepoint_rti_from_federate(_f_rti->trace, receive_PORT_ABS, federate_id, &tag);
tracepoint_rti_from_federate(_f_rti->trace, receive_PORT_ABS, sending_federate->enclave.id, &tag);
}

// Need to acquire the mutex lock to ensure that the thread handling
Expand Down Expand Up @@ -560,7 +546,15 @@ void handle_next_event_tag(federate_t* fed) {
*/
bool _lf_rti_stop_granted_already_sent_to_federates = false;

void _lf_rti_broadcast_stop_time_to_federates_already_locked() {
/**
* Once the RTI has seen proposed tags from all connected federates,
* it will broadcast a MSG_TYPE_STOP_GRANTED carrying the _RTI.max_stop_tag.
* This function also checks the most recently received NET from
* each federate and resets that be no greater than the _RTI.max_stop_tag.
*
* This function assumes the caller holds the _RTI.rti_mutex lock.
*/
void _lf_rti_broadcast_stop_time_to_federates_locked() {
if (_lf_rti_stop_granted_already_sent_to_federates == true) {
return;
}
Expand Down Expand Up @@ -592,16 +586,16 @@ void _lf_rti_broadcast_stop_time_to_federates_already_locked() {
}

void mark_federate_requesting_stop(federate_t* fed) {
if (!fed->enclave.requested_stop) {
if (!fed->requested_stop) {
// Assume that the federate
// has requested stop
_f_rti->num_enclaves_handling_stop++;
fed->enclave.requested_stop = true;
fed->requested_stop = true;
}
if (_f_rti->num_enclaves_handling_stop == _f_rti->number_of_enclaves) {
// We now have information about the stop time of all
// federates.
_lf_rti_broadcast_stop_time_to_federates_already_locked();
_lf_rti_broadcast_stop_time_to_federates_locked();
}
}

Expand All @@ -619,7 +613,7 @@ void handle_stop_request_message(federate_t* fed) {

// Check whether we have already received a stop_tag
// from this federate
if (fed->enclave.requested_stop) {
if (fed->requested_stop) {
// Ignore this request
lf_mutex_unlock(&rti_mutex);
return;
Expand Down Expand Up @@ -657,10 +651,15 @@ void handle_stop_request_message(federate_t* fed) {
ENCODE_STOP_REQUEST(stop_request_buffer, _f_rti->max_stop_tag.time, _f_rti->max_stop_tag.microstep);

// Iterate over federates and send each the MSG_TYPE_STOP_REQUEST message
// if we do not have a stop_time already for them.
// if we do not have a stop_time already for them. Do not do this more than once.
if (_f_rti->stop_in_progress) {
lf_mutex_unlock(&rti_mutex);
return;
}
_f_rti->stop_in_progress = true;
for (int i = 0; i < _f_rti->number_of_enclaves; i++) {
federate_t *f = _f_rti->enclaves[i];
if (f->enclave.id != fed->enclave.id && f->enclave.requested_stop == false) {
if (f->enclave.id != fed->enclave.id && f->requested_stop == false) {
if (f->enclave.state == NOT_CONNECTED) {
mark_federate_requesting_stop(f);
continue;
Expand Down Expand Up @@ -788,8 +787,7 @@ void handle_timestamp(federate_t *my_fed) {
tag_t tag = {.time = timestamp, .microstep = 0};
tracepoint_rti_from_federate(_f_rti->trace, receive_TIMESTAMP, my_fed->enclave.id, &tag);
}
LF_PRINT_LOG("RTI received timestamp message: %ld.", timestamp);
LF_PRINT_LOG("RTI received timestamp message: " PRINTF_TIME ".", timestamp);
LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp);

lf_mutex_lock(&rti_mutex);
_f_rti->num_feds_proposed_start++;
Expand Down Expand Up @@ -924,7 +922,6 @@ void* clock_synchronization_thread(void* noargs) {
// FIXME: We need better error handling here, but clock sync failure
// should not stop execution.
lf_print_error("Clock sync failed with federate %d. Not connected.", fed_id);
// mark_federate_requesting_stop(&fed);
continue;
} else if (!fed->clock_synchronization_enabled) {
continue;
Expand Down Expand Up @@ -1002,8 +999,6 @@ void handle_federate_resign(federate_t *my_fed) {
}

my_fed->enclave.state = NOT_CONNECTED;
// FIXME: The following results in spurious error messages.
// mark_federate_requesting_stop(my_fed);

// Indicate that there will no further events from this federate.
my_fed->enclave.next_event = FOREVER_TAG;
Expand Down Expand Up @@ -1046,8 +1041,7 @@ void* federate_thread_TCP(void* fed) {
lf_print_warning("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id);
my_fed->enclave.state = NOT_CONNECTED;
my_fed->socket = -1;
// FIXME: We need better error handling here, but this is probably not the right thing to do.
// mark_federate_requesting_stop(my_fed);
// FIXME: We need better error handling here, but do not stop execution here.
break;
}
LF_PRINT_DEBUG("RTI: Received message type %u from federate %d.", buffer[0], my_fed->enclave.id);
Expand Down Expand Up @@ -1372,57 +1366,67 @@ int receive_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id) {

#ifdef __RTI_AUTH__
bool authenticate_federate(int socket) {
// Buffer for message type and federation RTI nonce.
size_t message_length = 1 + NONCE_LENGTH;
unsigned char rti_hello_buffer[message_length];
rti_hello_buffer[0] = MSG_TYPE_RTI_NONCE;
unsigned char rti_nonce[NONCE_LENGTH];
RAND_bytes(rti_nonce, NONCE_LENGTH);
memcpy(rti_hello_buffer + 1, rti_nonce, NONCE_LENGTH);
// Send RTI hello with RTI's random nonce.
write_to_socket(socket, message_length, rti_hello_buffer);

// Check HMAC of received FED_RESPONSE message.
// Wait for MSG_TYPE_FED_NONCE from federate.
size_t fed_id_length = sizeof(uint16_t);
unsigned char buffer[1 + fed_id_length + NONCE_LENGTH];
read_from_socket_errexit(socket, 1 + fed_id_length + NONCE_LENGTH, buffer,
"Failed to read MSG_TYPE_FED_NONCE");
if (buffer[0] != MSG_TYPE_FED_NONCE) {
lf_print_error_and_exit(
"Received unexpected response %u from the FED (see net_common.h).",
buffer[0]);
}
unsigned int hmac_length = SHA256_HMAC_LENGTH;
size_t federation_id_length = strnlen(_f_rti->federation_id, 255);
size_t fed_id_length = sizeof(uint16_t);

unsigned char received[1 + NONCE_LENGTH + fed_id_length + hmac_length];
read_from_socket_errexit(socket, 1 + NONCE_LENGTH + fed_id_length + hmac_length, received, "Failed to read RTI response.");
// HMAC tag is created with MSG_TYPE, federate ID, received federate nonce.
unsigned char mac_buf[1 + fed_id_length + NONCE_LENGTH];
mac_buf[0] = MSG_TYPE_RTI_RESPONSE;
memcpy(&mac_buf[1], &buffer[1], fed_id_length);
memcpy(&mac_buf[1 + fed_id_length], &buffer[1 + fed_id_length], NONCE_LENGTH);
unsigned char hmac_tag[hmac_length];
unsigned char * ret = HMAC(EVP_sha256(), _f_rti->federation_id,
federation_id_length, mac_buf, 1 + fed_id_length + NONCE_LENGTH,
hmac_tag, &hmac_length);
if (ret == NULL) {
lf_print_error_and_exit("HMAC construction failed for MSG_TYPE_RTI_RESPONSE.");
}
// Make buffer for message type, RTI's nonce, and HMAC tag.
unsigned char sender[1 + NONCE_LENGTH + hmac_length];
sender[0] = MSG_TYPE_RTI_RESPONSE;
unsigned char rti_nonce[NONCE_LENGTH];
RAND_bytes(rti_nonce, NONCE_LENGTH);
memcpy(&sender[1], rti_nonce, NONCE_LENGTH);
memcpy(&sender[1 + NONCE_LENGTH], hmac_tag, hmac_length);
write_to_socket(socket, 1 + NONCE_LENGTH + hmac_length, sender);

// Wait for MSG_TYPE_FED_RESPONSE
unsigned char received[1 + hmac_length];
read_from_socket_errexit(socket, 1 + hmac_length, received,
"Failed to read federate response.");
if (received[0] != MSG_TYPE_FED_RESPONSE) {
lf_print_error("Received unexpected response %u from the FED (see net_common.h).",
received[0]);
lf_print_error_and_exit(
"Received unexpected response %u from the federate (see net_common.h).",
received[0]);
return false;
}

// Create tag to compare to received tag.
unsigned char buf_to_check[1 + fed_id_length + NONCE_LENGTH];
buf_to_check[0] = MSG_TYPE_FED_RESPONSE;
memcpy(&buf_to_check[1], &received[1 + NONCE_LENGTH], fed_id_length);
memcpy(&buf_to_check[1 + fed_id_length], rti_nonce, NONCE_LENGTH);
// HMAC tag is created with MSG_TYPE_FED_RESPONSE and RTI's nonce.
unsigned char mac_buf2[1 + NONCE_LENGTH];
mac_buf2[0] = MSG_TYPE_FED_RESPONSE;
memcpy(&mac_buf2[1], rti_nonce, NONCE_LENGTH);
unsigned char rti_tag[hmac_length];
HMAC(EVP_sha256(), _f_rti->federation_id, federation_id_length, buf_to_check, 1 + fed_id_length + NONCE_LENGTH,
rti_tag, &hmac_length);

ret = HMAC(EVP_sha256(), _f_rti->federation_id, federation_id_length,
mac_buf2, 1 + NONCE_LENGTH, rti_tag, &hmac_length);
if (ret == NULL) {
lf_print_error_and_exit("HMAC construction failed for MSG_TYPE_FED_RESPONSE.");
}
// Compare received tag and created tag.
if (memcmp(&received[1 + fed_id_length + NONCE_LENGTH], rti_tag, hmac_length) != 0) {
if (memcmp(&received[1], rti_tag, hmac_length) != 0) {
// Federation IDs do not match. Send back a HMAC_DOES_NOT_MATCH message.
lf_print_warning("HMAC authentication failed. Rejecting the federate.");
send_reject(socket, HMAC_DOES_NOT_MATCH);
return false;
}
else{
LF_PRINT_LOG("HMAC verified.");
// HMAC tag is created with MSG_TYPE and received federate nonce.
unsigned char mac_buf[1 + NONCE_LENGTH];
mac_buf[0] = MSG_TYPE_RTI_RESPONSE;
memcpy(&mac_buf[1], &received[1], NONCE_LENGTH);
// Buffer for message type and HMAC tag.
unsigned char sender[1 + hmac_length];
sender[0] = MSG_TYPE_RTI_RESPONSE;
HMAC(EVP_sha256(), _f_rti->federation_id, federation_id_length, mac_buf, 1 + NONCE_LENGTH,
&sender[1], &hmac_length);
write_to_socket(socket, 1 + hmac_length, sender);
} else {
LF_PRINT_LOG("Federate's HMAC verified.");
return true;
}
}
Expand All @@ -1449,7 +1453,7 @@ void connect_to_federates(int socket_descriptor) {
}
}

// Send RTI hello when RTI -a option is on.
// Wait for the first message from the federate when RTI -a option is on.
#ifdef __RTI_AUTH__
if (_f_rti->authentication_enabled) {
if (!authenticate_federate(socket_id)) {
Expand Down Expand Up @@ -1528,6 +1532,7 @@ void* respond_to_erroneous_connections(void* nothing) {

void initialize_federate(federate_t* fed, uint16_t id) {
initialize_enclave(&(fed->enclave), id);
fed->requested_stop = false;
fed->socket = -1; // No socket.
fed->clock_synchronization_enabled = true;
fed->in_transit_message_tags = initialize_in_transit_message_q();
Expand Down Expand Up @@ -1586,12 +1591,17 @@ void wait_for_federates(int socket_descriptor) {
if (shutdown(socket_descriptor, SHUT_RDWR)) {
LF_PRINT_LOG("On shut down TCP socket, received reply: %s", strerror(errno));
}
// NOTE: In all common TCP/IP stacks, there is a time period,
// typically between 30 and 120 seconds, called the TIME_WAIT period,
// before the port is released after this close. This is because
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
close(socket_descriptor);

/************** FIXME: The following is probably not needed.
The above shutdown and close should do the job.
/* NOTE: Below is a song and dance that is apparently not needed.
The above shutdown and close appear to do the job.
// NOTE: Apparently, closing the socket will not necessarily
// Apparently, closing the socket will not necessarily
// cause the respond_to_erroneous_connections accept() call to return,
// so instead, we connect here so that it can check the _f_rti->all_federates_exited
// variable.
Expand Down Expand Up @@ -1620,13 +1630,6 @@ void wait_for_federates(int socket_descriptor) {
close(tmp_socket);
}
}
// NOTE: In all common TCP/IP stacks, there is a time period,
// typically between 30 and 120 seconds, called the TIME_WAIT period,
// before the port is released after this close. This is because
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
close(socket_descriptor);
*/

if (_f_rti->socket_descriptor_UDP > 0) {
Expand Down Expand Up @@ -1827,4 +1830,5 @@ void initialize_RTI(){
_f_rti->clock_sync_exchanges_per_interval = 10,
_f_rti->authentication_enabled = false,
_f_rti->tracing_enabled = false;
_f_rti->stop_in_progress = false;
}
Loading

0 comments on commit ffc5b07

Please sign in to comment.