Skip to content

Commit

Permalink
Merge pull request #243 from lf-lang/enclaves-tuning
Browse files Browse the repository at this point in the history
Enclaves tuning
  • Loading branch information
edwardalee authored Jun 26, 2023
2 parents 95106eb + 62cacda commit a17ffe0
Show file tree
Hide file tree
Showing 20 changed files with 192 additions and 31 deletions.
30 changes: 30 additions & 0 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ int send_timed_message(environment_t* env,
const char* next_destination_str,
size_t length,
unsigned char* message) {
assert(env != GLOBAL_ENVIRONMENT);

unsigned char header_buffer[1 + sizeof(uint16_t) + sizeof(uint16_t)
+ sizeof(int32_t) + sizeof(instant_t) + sizeof(microstep_t)];
// First byte identifies this as a timed message.
Expand Down Expand Up @@ -1408,6 +1410,7 @@ void mark_control_reaction_waiting(int portID, bool waiting) {
* @param portID the ID of the port to determine status for
*/
port_status_t get_current_port_status(environment_t* env, int portID) {
assert(env != GLOBAL_ENVIRONMENT);
// Check whether the status of the port is known at the current tag.
trigger_t* network_input_port_action = _lf_action_for_port(portID)->trigger;
if (network_input_port_action->status == present) {
Expand Down Expand Up @@ -1439,6 +1442,7 @@ port_status_t get_current_port_status(environment_t* env, int portID) {
* @param env The environment of the federate
*/
void enqueue_network_input_control_reactions(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);
#ifdef FEDERATED_CENTRALIZED
if (!_fed.has_upstream) {
// This federate is not connected to any upstream federates via a
Expand Down Expand Up @@ -1467,6 +1471,7 @@ void enqueue_network_input_control_reactions(environment_t* env) {
* @param env The environment of the federate
*/
void enqueue_network_output_control_reactions(environment_t* env){
assert(env != GLOBAL_ENVIRONMENT);
#ifdef FEDERATED_CENTRALIZED
if (!_fed.has_downstream) {
// This federate is not connected to any downstream federates via a
Expand Down Expand Up @@ -1497,6 +1502,7 @@ void enqueue_network_output_control_reactions(environment_t* env){
* @param env The environment of the federate
*/
void enqueue_network_control_reactions(environment_t *env) {
assert(env != GLOBAL_ENVIRONMENT);
enqueue_network_output_control_reactions(env);
#ifdef FEDERATED_CENTRALIZED
// If the granted tag is not provisional, there is no
Expand Down Expand Up @@ -1525,6 +1531,8 @@ void enqueue_network_control_reactions(environment_t *env) {
void send_port_absent_to_federate(environment_t* env, interval_t additional_delay,
unsigned short port_ID,
unsigned short fed_ID) {
assert(env != GLOBAL_ENVIRONMENT);

// Construct the message
size_t message_length = 1 + sizeof(port_ID) + sizeof(fed_ID) + sizeof(instant_t) + sizeof(microstep_t);
unsigned char buffer[message_length];
Expand Down Expand Up @@ -1577,6 +1585,8 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela
* @param STAA The safe-to-assume-absent threshold for the port
*/
void wait_until_port_status_known(environment_t* env, int port_ID, interval_t STAA) {
assert(env != GLOBAL_ENVIRONMENT);

// Need to lock the mutex to prevent
// a race condition with the network
// receiver logic.
Expand Down Expand Up @@ -1683,6 +1693,8 @@ static trigger_handle_t schedule_message_received_from_network_already_locked(
trigger_t* trigger,
tag_t tag,
lf_token_t* token) {
assert(env != GLOBAL_ENVIRONMENT);

// Return value of the function
int return_value = 0;

Expand Down Expand Up @@ -1802,6 +1814,8 @@ void _lf_close_inbound_socket(int fed_id) {
* @param fed_id The sending federate ID or -1 if the centralized coordination.
*/
static void handle_port_absent_message(environment_t* env, int socket, int fed_id) {
assert(env != GLOBAL_ENVIRONMENT);

size_t bytes_to_read = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(instant_t) + sizeof(microstep_t);
unsigned char buffer[bytes_to_read];
read_from_socket_errexit(socket, bytes_to_read, buffer,
Expand Down Expand Up @@ -1907,6 +1921,8 @@ void handle_message(int socket, int fed_id) {
* @param fed_id The sending federate ID or -1 if the centralized coordination.
*/
void handle_tagged_message(environment_t* env, int socket, int fed_id) {
assert(env != GLOBAL_ENVIRONMENT);

// FIXME: Need better error handling?
// Read the header which contains the timestamp.
size_t bytes_to_read = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(int32_t)
Expand Down Expand Up @@ -2087,6 +2103,8 @@ void handle_tagged_message(environment_t* env, int socket, int fed_id) {
* it sets last_TAG_was_provisional to false.
*/
void handle_tag_advance_grant(environment_t *env) {
assert(env != GLOBAL_ENVIRONMENT);

size_t bytes_to_read = sizeof(instant_t) + sizeof(microstep_t);
unsigned char buffer[bytes_to_read];
read_from_socket_errexit(_fed.socket_TCP_RTI, bytes_to_read, buffer,
Expand Down Expand Up @@ -2167,6 +2185,8 @@ void _lf_logical_tag_complete(tag_t tag_to_send) {
* last known tag for input ports.
*/
void handle_provisional_tag_advance_grant(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

size_t bytes_to_read = sizeof(instant_t) + sizeof(microstep_t);
unsigned char buffer[bytes_to_read];
read_from_socket_errexit(_fed.socket_TCP_RTI, bytes_to_read, buffer,
Expand Down Expand Up @@ -2274,6 +2294,8 @@ void handle_provisional_tag_advance_grant(environment_t* env) {
* @param env The environment of the federate
*/
void _lf_fd_send_stop_request_to_rti(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

// Do not send a stop request twice.
if (_fed.sent_a_stop_request_to_rti == true) {
return;
Expand Down Expand Up @@ -2314,6 +2336,8 @@ void _lf_fd_send_stop_request_to_rti(environment_t* env) {
* @param env The environment of the federate
*/
void handle_stop_granted_message(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

size_t bytes_to_read = MSG_TYPE_STOP_GRANTED_LENGTH - 1;
unsigned char buffer[bytes_to_read];
read_from_socket_errexit(_fed.socket_TCP_RTI, bytes_to_read, buffer,
Expand Down Expand Up @@ -2361,6 +2385,8 @@ void handle_stop_granted_message(environment_t* env) {
* @param env The environment of the federate
*/
void handle_stop_request_message(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

size_t bytes_to_read = MSG_TYPE_STOP_REQUEST_LENGTH - 1;
unsigned char buffer[bytes_to_read];
read_from_socket_errexit(_fed.socket_TCP_RTI, bytes_to_read, buffer,
Expand Down Expand Up @@ -2430,6 +2456,8 @@ void handle_stop_request_message(environment_t* env) {
* @param env The environment of the federate
*/
void terminate_execution(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

// Check for all outgoing physical connections in
// _fed.sockets_for_outbound_p2p_connections and
// if the socket ID is not -1, the connection is still open.
Expand Down Expand Up @@ -2661,6 +2689,7 @@ void* listen_to_rti_TCP(void* args) {
* FIXME: Possibly should be renamed
*/
void synchronize_with_other_federates(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

LF_PRINT_DEBUG("Synchronizing with other federates.");

Expand Down Expand Up @@ -2795,6 +2824,7 @@ bool _lf_bounded_NET(tag_t* tag) {
* @param wait_for_reply If true, wait for a reply.
*/
tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) {
assert(env != GLOBAL_ENVIRONMENT);
while (true) {
if (!_fed.has_downstream && !_fed.has_upstream) {
// This federate is not connected (except possibly by physical links)
Expand Down
3 changes: 3 additions & 0 deletions core/modal_models/modes.c
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ void _lf_terminate_modal_reactors(environment_t* env) {
_lf_unsused_suspended_events_head = NULL;
}
void _lf_initialize_modes(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);
if (env->modes) {
_lf_initialize_mode_states(
env,
Expand All @@ -565,6 +566,7 @@ void _lf_initialize_modes(environment_t* env) {
}
}
void _lf_handle_mode_changes(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);
if (env->modes) {
_lf_process_mode_changes(
env,
Expand All @@ -579,6 +581,7 @@ void _lf_handle_mode_changes(environment_t* env) {
}

void _lf_handle_mode_triggered_reactions(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);
if (env->modes) {
_lf_handle_mode_startup_reset_reactions(
env, env->startup_reactions, env->startup_reactions_size,
Expand Down
15 changes: 12 additions & 3 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* @author{Soroush Bateni <soroush@utdallas.edu>}
* @author{Erling Jellum <erlingrj@berkeley.edu>}
*/
#include <assert.h>
#include <string.h>

#include "reactor.h"
Expand Down Expand Up @@ -115,6 +116,8 @@ void lf_print_snapshot(environment_t* env) {
* worker number does not make sense (e.g., the caller is not a worker thread).
*/
void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number) {
assert(env != GLOBAL_ENVIRONMENT);

#ifdef MODAL_REACTORS
// Check if reaction is disabled by mode inactivity
if (!_lf_mode_is_active(reaction->mode)) {
Expand All @@ -141,6 +144,8 @@ void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_n
* should stop.
*/
int _lf_do_step(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

// Invoke reactions.
while(pqueue_size(env->reaction_q) > 0) {
// lf_print_snapshot();
Expand Down Expand Up @@ -227,6 +232,8 @@ int _lf_do_step(environment_t* env) {
// the keepalive command-line option has not been given.
// Otherwise, return 1.
int next(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);

// Enter the critical section and do not leave until we have
// determined which tag to commit to and start invoking reactions for.
if (lf_critical_section_enter(env) != 0) {
Expand Down Expand Up @@ -304,6 +311,8 @@ int next(environment_t* env) {
* @param env Environment in which we are executing
*/
void _lf_request_stop(environment_t *env) {
assert(env != GLOBAL_ENVIRONMENT);

tag_t new_stop_tag;
new_stop_tag.time = env->current_tag.time;
new_stop_tag.microstep = env->current_tag.microstep + 1;
Expand Down Expand Up @@ -390,23 +399,23 @@ int lf_reactor_c_main(int argc, const char* argv[]) {

/**
* @brief Notify of new event by calling the unthreaded platform API
* @param env Environment in which we are execution
* @param env Environment in which we are executing.
*/
int lf_notify_of_event(environment_t* env) {
return _lf_unthreaded_notify_of_event();
}

/**
* @brief Enter critical section by disabling interrupts
* @param env Environment in which we are execution
* @param env Environment in which we are executing.
*/
int lf_critical_section_enter(environment_t* env) {
return lf_disable_interrupts_nested();
}

/**
* @brief Leave a critical section by enabling interrupts
* @param env Environment in which we are execution
* @param env Environment in which we are executing.
*/
int lf_critical_section_exit(environment_t* env) {
return lf_enable_interrupts_nested();
Expand Down
Loading

0 comments on commit a17ffe0

Please sign in to comment.