Skip to content

Commit

Permalink
Add a next downstream tag
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun committed Jan 23, 2024
1 parent ccc45bf commit aa090a1
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 8 deletions.
8 changes: 7 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,17 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
* @brief Initialize the federation-specific parts of the environment struct.
*/
static void environment_init_federated(environment_t* env, int num_is_present_fields) {
#ifdef FEDERATED
env->ndt_q = pqueue_tag_init(10);
#endif
#ifdef FEDERATED_DECENTRALIZED
env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*));
LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory");
env->_lf_intended_tag_fields_size = num_is_present_fields;
#endif
}

void environment_init_tags( environment_t *env, instant_t start_time, interval_t duration) {
void environment_init_tags(environment_t *env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
Expand Down Expand Up @@ -149,6 +152,9 @@ static void environment_free_modes(environment_t* env) {
}

static void environment_free_federated(environment_t* env) {
#ifdef FEDERATED
free(env->ndt_q);
#endif
#ifdef FEDERATED_DECENTRALIZED
free(env->_lf_intended_tag_fields);
#endif
Expand Down
40 changes: 40 additions & 0 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,49 @@ void handle_next_event_tag(federate_info_t *fed) {
update_federate_next_event_tag_locked(
fed->enclave.id,
intended_tag);
// If fed cannot get the grant of the intended tag, send NDTs to its upstream federates.
if (lf_tag_compare(fed->enclave.last_granted, intended_tag) < 0) {
send_upstream_next_downstream_tag(fed, intended_tag);
}
LF_MUTEX_UNLOCK(rti_mutex);
}

void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag) {
// The RTI receives next_event_tag from fed.
// It has to send NDT messages to the upstream federates of fed
// if the LTC message from an upstream federate is ealrier than the next_event_tag.
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_NEXT_DOWNSTREAM_TAG;
encode_int64(next_event_tag.time, &(buffer[1]));
encode_int32((int32_t)next_event_tag.microstep, &(buffer[1 + sizeof(int64_t)]));

// FIXME: Send NDT to transitive upstreams either. Also, the RTI has to check the sparsity
// of a federate and determine whether it sends NDT to it or not.
for (int i = 0; i < fed->enclave.num_upstream; i++) {
int upstream_id = fed->enclave.upstream[i];
// scheduling_node_t* upstream_node = rti_remote->base.scheduling_nodes[upstream_id];
federate_info_t* upstream_federate = GET_FED_INFO(upstream_id);

if (is_in_cycle(&(upstream_federate->enclave))) {
LF_PRINT_DEBUG("Do not send the NDT to federate %d", i);
continue;
}

if (lf_tag_compare(upstream_federate->enclave.completed, next_event_tag) < 0 &&
lf_tag_compare(upstream_federate->enclave.next_event, next_event_tag) <= 0) {
// Send next downstream tag to upstream federates that do not complete at next_event_tag
LF_PRINT_LOG("RTI sending the next downstream event message (NDT) " PRINTF_TAG "to federate %u.",
next_event_tag.time - start_time, next_event_tag.microstep, upstream_id);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(rti_remote->base.trace, send_NDT, upstream_id, &next_event_tag);
}
write_to_socket_fail_on_error(&upstream_federate->socket, message_length, buffer, &rti_mutex,
"RTI failed to send MSG_TYPE_NEXT_DOWNSTREAM_TAG message to federate %d.", upstream_id);
}
}
}

/////////////////// STOP functions ////////////////////

/**
Expand Down
9 changes: 9 additions & 0 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ void handle_latest_tag_complete(federate_info_t* fed);
*/
void handle_next_event_tag(federate_info_t* fed);

/**
* Send a next dowsntream tag (NDT) message. This will be called by handle_next_event_Tag
* @see MSG_TYPE_NEXT_EVENT_TAG in rti.h.
*
* @param fed The source federate of next event tag.
* @param next_event_tag The next event tag from the downstream federate.
*/
void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag);

/////////////////// STOP functions ////////////////////

/**
Expand Down
90 changes: 83 additions & 7 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "federate.h"
#include "net_common.h"
#include "net_util.h"
#include "pqueue_tag.h"
#include "reactor.h"
#include "reactor_common.h"
#include "reactor_threaded.h"
Expand Down Expand Up @@ -1513,6 +1514,39 @@ static void handle_rti_failed_message(void) {
exit(1);
}

/**
* Handle a next downstream tag (NDT) from the RTI.
*/
static void handle_next_dowsntream_tag() {
size_t bytes_to_read = sizeof(instant_t) + sizeof(microstep_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read next downstream tag from RTI.");
tag_t NDT = extract_tag(buffer);

LF_PRINT_LOG("Received from RTI a MSG_TYPE_NEXT_DOWNSTREAM_TAG message with elapsed tag " PRINTF_TAG ".",
NDT.time - start_time, NDT.microstep);
// Trace the event when tracing is enabled
tracepoint_federate_from_rti(_fed.trace, receive_NDT, _lf_my_fed_id, &NDT);

environment_t* env;
_lf_get_environments(&env);

if (lf_tag_compare(env->current_tag, NDT) <= 0) {
// The current tag is less than or equal to the NDT. Insert NDT to ndt_q if this tag is not in the queue.
LF_PRINT_LOG("env->current_tag <= NDT.");
pqueue_tag_insert_if_no_match(env->ndt_q, NDT);
}
if (lf_tag_compare(env->current_tag, NDT) > 0) {
// The current tag is greater than the NDT. Send the LTC with the NDT and
// push the current tag to ndt_q to notify the appropriate NET message.
LF_PRINT_LOG("env->current_tag > NDT.");
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, NDT);
pqueue_tag_insert_if_no_match(env->ndt_q, env->current_tag);
}
LF_PRINT_LOG("peek = " PRINTF_TAG ".", pqueue_tag_peek_tag(env->ndt_q));
}

/**
* Thread that listens for TCP inputs from the RTI.
* When messages arrive, this calls the appropriate handler.
Expand Down Expand Up @@ -1586,6 +1620,8 @@ static void* listen_to_rti_TCP(void* args) {
case MSG_TYPE_FAILED:
handle_rti_failed_message();
break;
case MSG_TYPE_NEXT_DOWNSTREAM_TAG:
handle_next_dowsntream_tag();
case MSG_TYPE_CLOCK_SYNC_T1:
case MSG_TYPE_CLOCK_SYNC_T4:
lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.",
Expand Down Expand Up @@ -2282,11 +2318,28 @@ void lf_latest_tag_complete(tag_t tag_to_send) {
if (compare_with_last_tag >= 0) {
return;
}
LF_PRINT_LOG("Sending Latest Tag Complete (LTC) " PRINTF_TAG " to the RTI.",
environment_t *env;
_lf_get_environments(&env);
bool need_to_send_LTC = true;
// Check the ndt queue's size to prevent to compare a tag with NULL.
if (pqueue_tag_size(env->ndt_q) != 0 ) {
tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag;
if (lf_tag_compare(tag_to_send, earliest_ndt) < 0) {
// No events exist in any downstream federates
LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "."
"Skip sending the logical tag complete.",
tag_to_send.time - start_time, tag_to_send.microstep,
earliest_ndt.time - start_time, earliest_ndt.microstep);
need_to_send_LTC = false;
}
}
if (need_to_send_LTC) {
LF_PRINT_LOG("Sending Logical Tag Complete (LTC) " PRINTF_TAG " to the RTI.",
tag_to_send.time - start_time,
tag_to_send.microstep);
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
send_tag(MSG_TYPE_LATEST_TAG_COMPLETE, tag_to_send);
_fed.last_sent_LTC = tag_to_send;
}
}

parse_rti_code_t lf_parse_rti_addr(const char* rti_addr) {
Expand Down Expand Up @@ -2439,10 +2492,29 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// This if statement does not fall through but rather returns.
// NET is not bounded by physical time or has no downstream federates.
// Normal case.
send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag);
_fed.last_sent_NET = tag;
LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.",
tag.time - start_time, tag.microstep);

// If there is no downstream events that require the NET of the current tag,
// do not send the NET.
bool need_to_send_NET = true;
if (pqueue_tag_size(env->ndt_q) != 0 ) {
// FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size
// is not enough to know whether this federate using the NDT optimization or not.
tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag;
if (lf_tag_compare(tag, earliest_ndt) < 0) {
// No events exist in any downstream federates
LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "."
"Skip sending the next event tag.",
tag.time - start_time, tag.microstep,
earliest_ndt.time - start_time, earliest_ndt.microstep);
need_to_send_NET = false;
}
}
if (need_to_send_NET) {
send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag);
_fed.last_sent_NET = tag;
LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.",
tag.time - start_time, tag.microstep);
}

if (!wait_for_reply) {
LF_PRINT_LOG("Not waiting for reply to NET.");
Expand Down Expand Up @@ -2704,6 +2776,10 @@ int lf_send_tagged_message(environment_t* env,
tracepoint_federate_to_rti(_fed.trace, send_TAGGED_MSG, _lf_my_fed_id, &current_message_intended_tag);
}

// Insert the intended tag into the ndt_q to send LTC to the RTI quickly.
LF_PRINT_DEBUG("Insert the intended tag to the ndt queue to send LTC and NET quickly.");
pqueue_tag_insert_if_no_match(env->ndt_q, current_message_intended_tag);

int result = write_to_socket_close_on_error(socket, header_length, header_buffer);
if (result == 0) {
// Header sent successfully. Send the body.
Expand Down
10 changes: 10 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endif
#include "port.h"
#include "pqueue.h"
#include "pqueue_tag.h"
#include "reactor.h"
#include "reactor_common.h"
#include "tag.h"
Expand Down Expand Up @@ -316,6 +317,15 @@ void _lf_start_time_step(environment_t *env) {
}
#endif // FEDERATED_DECENTRALIZED

while (pqueue_tag_size(env->ndt_q) != 0
&& lf_tag_compare(pqueue_tag_peek(env->ndt_q)->tag, env->current_tag) < 0) {
// Remove elements of ndt_q with tag less than the current tag.
tag_t tag_to_remove = pqueue_tag_pop_tag(env->ndt_q);
LF_PRINT_DEBUG("Remove the tag " PRINTF_TAG " from the ndt_q is less than the current tag " PRINTF_TAG ". Remove it.",
tag_to_remove.time - start_time, tag_to_remove.microstep,
env->current_tag.time - start_time, env->current_tag.microstep);
}

// Reset absent fields on network ports because
// their status is unknown
lf_reset_status_fields_on_input_port_triggers();
Expand Down
2 changes: 2 additions & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include "lf_types.h"
#include "platform.h"
#include "pqueue_tag.h"
#include "trace.h"

// Forward declarations so that a pointers can appear in the environment struct.
Expand Down Expand Up @@ -106,6 +107,7 @@ typedef struct environment_t {
#if defined(FEDERATED)
tag_t** _lf_intended_tag_fields;
int _lf_intended_tag_fields_size;
pqueue_tag_t* ndt_q;
#endif // FEDERATED
#ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef
enclave_info_t *enclave_info;
Expand Down
13 changes: 13 additions & 0 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,19 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#define MSG_TYPE_FAILED 25

/**
* Byte identifying a next downstream event tag (NDET) message sent from a downstream
* federate via the RTI in centralized coordination.
* The next eight bytes will be the timestamp.
* The next four bytes will be the microstep.
* This message from the RTI tells the federate the tag of the earliest event on the
* source federate's event queue. In other words, this federate only has to send LTC
* and NULL messages greater than or equal to this tag. If this federate has no its
* upstream federates, this federate also can skip sending NET messages that are
* greater than or equal to this tag.
*/
#define MSG_TYPE_NEXT_DOWNSTREAM_TAG 26

/////////////////////////////////////////////
//// Rejection codes

Expand Down
4 changes: 4 additions & 0 deletions include/core/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ typedef enum
send_P2P_MSG,
send_ADR_AD,
send_ADR_QR,
send_NDT,
// Receiving messages
receive_ACK,
receive_FAILED,
Expand All @@ -121,6 +122,7 @@ typedef enum
receive_P2P_MSG,
receive_ADR_AD,
receive_ADR_QR,
receive_NDT,
receive_UNIDENTIFIED,
NUM_EVENT_TYPES
} trace_event_t;
Expand Down Expand Up @@ -164,6 +166,7 @@ static const char *trace_event_names[] = {
"Sending P2P_MSG",
"Sending ADR_AD",
"Sending ADR_QR",
"Sending NDT",
// Receiving messages
"Receiving ACK",
"Receiving FAILED",
Expand All @@ -186,6 +189,7 @@ static const char *trace_event_names[] = {
"Receiving P2P_MSG",
"Receiving ADR_AD",
"Receiving ADR_QR",
"Receiving NDT",
"Receiving UNIDENTIFIED",
};

Expand Down
3 changes: 3 additions & 0 deletions util/tracing/visualization/fedsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
.NET { stroke: #118ab2; fill: #118ab2} \
.PTAG { stroke: #06d6a0; fill: #06d6a0} \
.TAG { stroke: #08a578; fill: #08a578} \
.NDT { stroke: purple; fill: purple} \
.TIMESTAMP { stroke: grey; fill: grey } \
.FED_ID {stroke: #80DD99; fill: #80DD99 } \
.ADV {stroke-linecap="round" ; stroke: "red" ; fill: "red"} \
Expand Down Expand Up @@ -61,6 +62,7 @@
"Sending P2P_MSG": "P2P_MSG",
"Sending ADR_AD": "ADR_AD",
"Sending ADR_QR": "ADR_QR",
"Sending NDT": "NDT",
"Receiving ACK": "ACK",
"Receiving FAILED": "FAILED",
"Receiving TIMESTAMP": "TIMESTAMP",
Expand All @@ -82,6 +84,7 @@
"Receiving P2P_MSG": "P2P_MSG",
"Receiving ADR_AD": "ADR_AD",
"Receiving ADR_QR": "ADR_QR",
"Receiving NDT": "NDT",
"Receiving UNIDENTIFIED": "UNIDENTIFIED",
"Scheduler advancing time ends": "AdvLT"
}
Expand Down

0 comments on commit aa090a1

Please sign in to comment.