Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Introduce Next Downstream Tag message (NDT) to optimize the communication in centralized federated execution #337

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,17 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED
env->ndt_q = pqueue_tag_init(10);
#endif
#ifdef FEDERATED_DECENTRALIZED
env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory");
env->_lf_intended_tag_fields_size = num_is_present_fields;
#endif
}

void environment_init_tags( environment_t *env, instant_t start_time, interval_t duration) {
void environment_init_tags(environment_t *env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
Expand Down Expand Up @@ -149,6 +152,9 @@ static void environment_free_modes(environment_t* env) {
}

static void environment_free_federated(environment_t* env) {
#ifdef FEDERATED
free(env->ndt_q);
#endif
#ifdef FEDERATED_DECENTRALIZED
free(env->_lf_intended_tag_fields);
#endif
Expand Down
40 changes: 40 additions & 0 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,49 @@ void handle_next_event_tag(federate_info_t *fed) {
update_federate_next_event_tag_locked(
fed->enclave.id,
intended_tag);
// If fed cannot get the grant of the intended tag, send NDTs to its upstream federates.
if (lf_tag_compare(fed->enclave.last_granted, intended_tag) < 0) {
send_upstream_next_downstream_tag(fed, intended_tag);
}
LF_MUTEX_UNLOCK(rti_mutex);
}

void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag) {
// The RTI receives next_event_tag from fed.
// It has to send NDT messages to the upstream federates of fed
// if the LTC message from an upstream federate is ealrier than the next_event_tag.
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_NEXT_DOWNSTREAM_TAG;
encode_int64(next_event_tag.time, &(buffer[1]));
encode_int32((int32_t)next_event_tag.microstep, &(buffer[1 + sizeof(int64_t)]));

// FIXME: Send NDT to transitive upstreams either. Also, the RTI has to check the sparsity
// of a federate and determine whether it sends NDT to it or not.
for (int i = 0; i < fed->enclave.num_upstream; i++) {
int upstream_id = fed->enclave.upstream[i];
// scheduling_node_t* upstream_node = rti_remote->base.scheduling_nodes[upstream_id];
federate_info_t* upstream_federate = GET_FED_INFO(upstream_id);

if (is_in_cycle(&(upstream_federate->enclave))) {
LF_PRINT_DEBUG("Do not send the NDT to federate %d", i);
continue;
}

if (lf_tag_compare(upstream_federate->enclave.completed, next_event_tag) < 0 &&
lf_tag_compare(upstream_federate->enclave.next_event, next_event_tag) <= 0) {
// Send next downstream tag to upstream federates that do not complete at next_event_tag
LF_PRINT_LOG("RTI sending the next downstream event message (NDT) " PRINTF_TAG "to federate %u.",
next_event_tag.time - start_time, next_event_tag.microstep, upstream_id);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(rti_remote->base.trace, send_NDT, upstream_id, &next_event_tag);
}
write_to_socket_fail_on_error(&upstream_federate->socket, message_length, buffer, &rti_mutex,
"RTI failed to send MSG_TYPE_NEXT_DOWNSTREAM_TAG message to federate %d.", upstream_id);
}
}
}

/////////////////// STOP functions ////////////////////

/**
Expand Down
9 changes: 9 additions & 0 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ void handle_latest_tag_complete(federate_info_t* fed);
*/
void handle_next_event_tag(federate_info_t* fed);

/**
* Send a next dowsntream tag (NDT) message. This will be called by handle_next_event_Tag
* @see MSG_TYPE_NEXT_EVENT_TAG in rti.h.
*
* @param fed The source federate of next event tag.
* @param next_event_tag The next event tag from the downstream federate.
*/
void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag);

/////////////////// STOP functions ////////////////////

/**
Expand Down
92 changes: 85 additions & 7 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "federate.h"
#include "net_common.h"
#include "net_util.h"
#include "pqueue_tag.h"
#include "reactor.h"
#include "reactor_common.h"
#include "reactor_threaded.h"
Expand Down Expand Up @@ -1513,6 +1514,36 @@ static void handle_rti_failed_message(void) {
exit(1);
}

/**
* Handle a next downstream tag (NDT) from the RTI.
*/
static void handle_next_dowsntream_tag() {
size_t bytes_to_read = sizeof(instant_t) + sizeof(microstep_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read next downstream tag from RTI.");
tag_t NDT = extract_tag(buffer);

LF_PRINT_LOG("Received from RTI a MSG_TYPE_NEXT_DOWNSTREAM_TAG message with elapsed tag " PRINTF_TAG ".",
NDT.time - start_time, NDT.microstep);
// Trace the event when tracing is enabled
tracepoint_federate_from_rti(_fed.trace, receive_NDT, _lf_my_fed_id, &NDT);

environment_t* env;
_lf_get_environments(&env);

if (lf_tag_compare(env->current_tag, NDT) <= 0) {
// The current tag is less than or equal to the NDT. Insert NDT to ndt_q if this tag is not in the queue.
pqueue_tag_insert_if_no_match(env->ndt_q, NDT);
}
if (lf_tag_compare(env->current_tag, NDT) > 0) {
// The current tag is greater than the NDT. Send the LTC with the NDT and
// push the current tag to ndt_q to notify the appropriate NET message.
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, NDT);
pqueue_tag_insert_if_no_match(env->ndt_q, env->current_tag);
}
}

/**
* Thread that listens for TCP inputs from the RTI.
* When messages arrive, this calls the appropriate handler.
Expand Down Expand Up @@ -1586,6 +1617,9 @@ static void* listen_to_rti_TCP(void* args) {
case MSG_TYPE_FAILED:
handle_rti_failed_message();
break;
case MSG_TYPE_NEXT_DOWNSTREAM_TAG:
handle_next_dowsntream_tag();
break;
case MSG_TYPE_CLOCK_SYNC_T1:
case MSG_TYPE_CLOCK_SYNC_T4:
lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.",
Expand Down Expand Up @@ -2282,11 +2316,28 @@ void lf_latest_tag_complete(tag_t tag_to_send) {
if (compare_with_last_tag >= 0) {
return;
}
LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.",
environment_t *env;
_lf_get_environments(&env);
bool need_to_send_LTC = true;
// Check the ndt queue's size to prevent to compare a tag with NULL.
if (pqueue_tag_size(env->ndt_q) != 0 ) {
tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag;
if (lf_tag_compare(tag_to_send, earliest_ndt) < 0) {
// No events exist in any downstream federates
LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "."
"Skip sending the logical tag complete.",
tag_to_send.time - start_time, tag_to_send.microstep,
earliest_ndt.time - start_time, earliest_ndt.microstep);
need_to_send_LTC = false;
}
}
if (need_to_send_LTC) {
LF_PRINT_LOG("Sending Logical Tag Complete (LTC) " PRINTF_TAG " to the RTI.",
tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
}
}

parse_rti_code_t lf_parse_rti_addr(const char* rti_addr) {
Expand Down Expand Up @@ -2439,10 +2490,29 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// This if statement does not fall through but rather returns.
// NET is not bounded by physical time or has no downstream federates.
// Normal case.
send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag);
_fed.last_sent_NET = tag;
LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.",
tag.time - start_time, tag.microstep);

// If there is no downstream events that require the NET of the current tag,
// do not send the NET.
bool need_to_send_NET = true;
if (pqueue_tag_size(env->ndt_q) != 0 ) {
// FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size
// is not enough to know whether this federate using the NDT optimization or not.
tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag;
if (lf_tag_compare(tag, earliest_ndt) < 0) {
// No events exist in any downstream federates
LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "."
"Skip sending the next event tag.",
tag.time - start_time, tag.microstep,
earliest_ndt.time - start_time, earliest_ndt.microstep);
need_to_send_NET = false;
}
}
if (need_to_send_NET) {
send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag);
_fed.last_sent_NET = tag;
LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.",
tag.time - start_time, tag.microstep);
}

if (!wait_for_reply) {
LF_PRINT_LOG("Not waiting for reply to NET.");
Expand Down Expand Up @@ -2704,6 +2774,14 @@ int lf_send_tagged_message(environment_t* env,
tracepoint_federate_to_rti(_fed.trace, send_TAGGED_MSG, _lf_my_fed_id, &current_message_intended_tag);
}

// Insert the intended tag into the ndt_q to send LTC to the RTI quickly.
if (pqueue_tag_size(env->ndt_q) != 0) {
// Check the size of the queue to know if NDT is used or not.
// FIXME: This mechanism won't be work when we dynamically control NDTs.
LF_PRINT_DEBUG("Insert the intended tag to the ndt queue to send LTC and NET quickly.");
pqueue_tag_insert_if_no_match(env->ndt_q, current_message_intended_tag);
}

int result = write_to_socket_close_on_error(socket, header_length, header_buffer);
if (result == 0) {
// Header sent successfully. Send the body.
Expand Down
10 changes: 10 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endif
#include "port.h"
#include "pqueue.h"
#include "pqueue_tag.h"
#include "reactor.h"
#include "reactor_common.h"
#include "tag.h"
Expand Down Expand Up @@ -316,6 +317,15 @@ void _lf_start_time_step(environment_t *env) {
}
#endif // FEDERATED_DECENTRALIZED

while (pqueue_tag_size(env->ndt_q) != 0
&& lf_tag_compare(pqueue_tag_peek(env->ndt_q)->tag, env->current_tag) < 0) {
// Remove elements of ndt_q with tag less than the current tag.
tag_t tag_to_remove = pqueue_tag_pop_tag(env->ndt_q);
LF_PRINT_DEBUG("Remove the tag " PRINTF_TAG " from the ndt_q is less than the current tag " PRINTF_TAG ". Remove it.",
tag_to_remove.time - start_time, tag_to_remove.microstep,
env->current_tag.time - start_time, env->current_tag.microstep);
}

// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
Expand Down
2 changes: 2 additions & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include "lf_types.h"
#include "platform.h"
#include "pqueue_tag.h"
#include "trace.h"

// Forward declarations so that a pointers can appear in the environment struct.
Expand Down Expand Up @@ -106,6 +107,7 @@ typedef struct environment_t {
#if defined(FEDERATED)
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
pqueue_tag_t* ndt_q;
#endif // FEDERATED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t *enclave_info;
Expand Down
13 changes: 13 additions & 0 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,19 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#define MSG_TYPE_FAILED 25

/**
* Byte identifying a next downstream event tag (NDET) message sent from a downstream
* federate via the RTI in centralized coordination.
* The next eight bytes will be the timestamp.
* The next four bytes will be the microstep.
* This message from the RTI tells the federate the tag of the earliest event on the
* source federate's event queue. In other words, this federate only has to send LTC
* and NULL messages greater than or equal to this tag. If this federate has no its
* upstream federates, this federate also can skip sending NET messages that are
* greater than or equal to this tag.
*/
#define MSG_TYPE_NEXT_DOWNSTREAM_TAG 26

/////////////////////////////////////////////
//// Rejection codes

Expand Down
4 changes: 4 additions & 0 deletions include/core/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ typedef enum
send_P2P_MSG,
send_ADR_AD,
send_ADR_QR,
send_NDT,
// Receiving messages
receive_ACK,
receive_FAILED,
Expand All @@ -121,6 +122,7 @@ typedef enum
receive_P2P_MSG,
receive_ADR_AD,
receive_ADR_QR,
receive_NDT,
receive_UNIDENTIFIED,
NUM_EVENT_TYPES
} trace_event_t;
Expand Down Expand Up @@ -164,6 +166,7 @@ static const char *trace_event_names[] = {
"Sending P2P_MSG",
"Sending ADR_AD",
"Sending ADR_QR",
"Sending NDT",
// Receiving messages
"Receiving ACK",
"Receiving FAILED",
Expand All @@ -186,6 +189,7 @@ static const char *trace_event_names[] = {
"Receiving P2P_MSG",
"Receiving ADR_AD",
"Receiving ADR_QR",
"Receiving NDT",
"Receiving UNIDENTIFIED",
};

Expand Down
3 changes: 3 additions & 0 deletions util/tracing/visualization/fedsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
.NET { stroke: #118ab2; fill: #118ab2} \
.PTAG { stroke: #06d6a0; fill: #06d6a0} \
.TAG { stroke: #08a578; fill: #08a578} \
.NDT { stroke: purple; fill: purple} \
.TIMESTAMP { stroke: grey; fill: grey } \
.FED_ID {stroke: #80DD99; fill: #80DD99 } \
.ADV {stroke-linecap="round" ; stroke: "red" ; fill: "red"} \
Expand Down Expand Up @@ -61,6 +62,7 @@
"Sending P2P_MSG": "P2P_MSG",
"Sending ADR_AD": "ADR_AD",
"Sending ADR_QR": "ADR_QR",
"Sending NDT": "NDT",
"Receiving ACK": "ACK",
"Receiving FAILED": "FAILED",
"Receiving TIMESTAMP": "TIMESTAMP",
Expand All @@ -82,6 +84,7 @@
"Receiving P2P_MSG": "P2P_MSG",
"Receiving ADR_AD": "ADR_AD",
"Receiving ADR_QR": "ADR_QR",
"Receiving NDT": "NDT",
"Receiving UNIDENTIFIED": "UNIDENTIFIED",
"Scheduler advancing time ends": "AdvLT"
}
Expand Down
Loading