diff --git a/core/src/main/java/org/lflang/LinguaFranca.xtext b/core/src/main/java/org/lflang/LinguaFranca.xtext index 37c4495c8b..ba45b3bd0c 100644 --- a/core/src/main/java/org/lflang/LinguaFranca.xtext +++ b/core/src/main/java/org/lflang/LinguaFranca.xtext @@ -224,7 +224,7 @@ Watchdog: code=Code; STP: - 'STP' '(' value=Expression ')' code=Code; + ('STP' | 'STAA') '(' value=Expression ')' code=Code; Preamble: (visibility=Visibility)? 'preamble' code=Code; diff --git a/core/src/main/java/org/lflang/ModelInfo.java b/core/src/main/java/org/lflang/ModelInfo.java index ab78509f66..d6cbd33af8 100644 --- a/core/src/main/java/org/lflang/ModelInfo.java +++ b/core/src/main/java/org/lflang/ModelInfo.java @@ -46,7 +46,6 @@ import org.lflang.lf.Parameter; import org.lflang.lf.ParameterReference; import org.lflang.lf.Reactor; -import org.lflang.lf.STP; import org.lflang.target.Target; import org.lflang.util.FileUtil; @@ -78,9 +77,6 @@ public class ModelInfo { /** The set of deadlines that use a too-large constant to specify their time interval. */ public Set overflowingDeadlines; - /** The set of STP offsets that use a too-large constant to specify their time interval. */ - public Set overflowingSTP; - /** * The set of parameters used to specify a deadline while having been assigned a default value the * is too large for this purpose. These parameters are to be reported during validation. @@ -171,7 +167,6 @@ private void collectOverflowingNodes() { this.overflowingAssignments = new HashSet<>(); this.overflowingDeadlines = new HashSet<>(); this.overflowingParameters = new HashSet<>(); - this.overflowingSTP = new HashSet<>(); // Visit all deadlines in the model; detect possible overflow. for (var deadline : filter(toIterable(model.eAllContents()), Deadline.class)) { @@ -188,13 +183,6 @@ && detectOverflow( this.overflowingDeadlines.add(deadline); } } - // Visit all STP offsets in the model; detect possible overflow. - for (var stp : filter(toIterable(model.eAllContents()), STP.class)) { - // If the time value overflows, mark this deadline as overflowing. - if (isTooLarge(ASTUtils.getLiteralTimeValue(stp.getValue()))) { - this.overflowingSTP.add(stp); - } - } } /** diff --git a/core/src/main/java/org/lflang/federated/extensions/CExtension.java b/core/src/main/java/org/lflang/federated/extensions/CExtension.java index 054ff0ede0..025d20749a 100644 --- a/core/src/main/java/org/lflang/federated/extensions/CExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/CExtension.java @@ -684,9 +684,7 @@ private String generateCodeToInitializeFederate( "// Initialize the socket mutexes", "lf_mutex_init(&lf_outbound_socket_mutex);", "lf_mutex_init(&socket_mutex);", - "lf_cond_init(&lf_port_status_changed, &env->mutex);", - CExtensionUtils.surroundWithIfFederatedDecentralized( - "lf_cond_init(&lf_current_tag_changed, &env->mutex);"))); + "lf_cond_init(&lf_port_status_changed, &env->mutex);")); // Find the STA (A.K.A. the global STP offset) for this federate. if (federate.targetConfig.get(CoordinationProperty.INSTANCE) @@ -696,7 +694,8 @@ private String generateCodeToInitializeFederate( reactor.getParameters().stream() .filter( param -> - param.getName().equalsIgnoreCase("STP_offset") + (param.getName().equalsIgnoreCase("STP_offset") + || param.getName().equalsIgnoreCase("STA")) && (param.getType() == null || param.getType().isTime())) .findFirst(); @@ -708,7 +707,7 @@ private String generateCodeToInitializeFederate( "lf_set_stp_offset(" + CTypes.getInstance().getTargetTimeExpr(globalSTPTV) + ");"); else if (globalSTP instanceof CodeExprImpl) code.pr("lf_set_stp_offset(" + ((CodeExprImpl) globalSTP).getCode().getBody() + ");"); - else messageReporter.at(stpParam.get().eContainer()).error("Invalid STP offset"); + else messageReporter.at(stpParam.get().eContainer()).error("Invalid STA offset"); } } diff --git a/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java b/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java index 6e905392c9..01ffc4bc00 100644 --- a/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java +++ b/core/src/main/java/org/lflang/federated/extensions/CExtensionUtils.java @@ -87,7 +87,7 @@ public static String initializeTriggersForNetworkActions( } /** - * Generate C code that holds a sorted list of STP structs by time. + * Generate C code that holds a sorted list of STAA structs by time. * *

For decentralized execution, on every logical timestep, a thread will iterate through each * staa struct, wait for the designated offset time, and set the associated port status to absent @@ -107,7 +107,7 @@ public static String stpStructs(FederateInstance federate) { for (int i = 0; i < federate.staaOffsets.size(); ++i) { // Find the corresponding ActionInstance. List networkActions = - federate.stpToNetworkActionMap.get(federate.staaOffsets.get(i)); + federate.staToNetworkActionMap.get(federate.staaOffsets.get(i)); code.pr("staa_lst[" + i + "] = (staa_t*) malloc(sizeof(staa_t));"); code.pr( diff --git a/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java b/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java index 5f74e5c9ec..94d8e6669d 100644 --- a/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java +++ b/core/src/main/java/org/lflang/federated/generator/FedASTUtils.java @@ -290,30 +290,31 @@ private static void addNetworkReceiverReactor( connection.dstFederate.zeroDelayCycleNetworkMessageActions.add(networkAction); // Get the largest STAA for any reaction triggered by the destination port. - TimeValue maxSTP = findMaxSTP(connection, coordination); + TimeValue maxSTAA = findMaxSTAA(connection, coordination); // Adjust this down by the delay on the connection, but do not go below zero. - TimeValue adjusted = maxSTP; + TimeValue adjusted = maxSTAA; TimeValue delay = ASTUtils.getLiteralTimeValue(connection.getDefinition().getDelay()); if (delay != null) { - adjusted = maxSTP.subtract(delay); + adjusted = maxSTAA.subtract(delay); } - if (!connection.dstFederate.currentSTPOffsets.contains(adjusted.time)) { - connection.dstFederate.currentSTPOffsets.add(adjusted.time); + // Need to include even zero STAAs so that ports can be assumed absent right away. + // Consolodate all equal STAAs. + if (!connection.dstFederate.currentSTAOffsets.contains(adjusted.time)) { + connection.dstFederate.currentSTAOffsets.add(adjusted.time); connection.dstFederate.staaOffsets.add(adjusted); - connection.dstFederate.stpToNetworkActionMap.put(adjusted, new ArrayList<>()); + connection.dstFederate.staToNetworkActionMap.put(adjusted, new ArrayList<>()); } else { // TODO: Find more efficient way to reuse timevalues for (var offset : connection.dstFederate.staaOffsets) { - if (maxSTP.time == offset.time) { - maxSTP = offset; + if (maxSTAA.time == offset.time) { + maxSTAA = offset; break; } } } - - connection.dstFederate.stpToNetworkActionMap.get(adjusted).add(networkAction); + connection.dstFederate.staToNetworkActionMap.get(adjusted).add(networkAction); // Add the action definition to the parent reactor. receiver.getActions().add(networkAction); @@ -521,7 +522,7 @@ private static void followReactionUpstream( * @param coordination The coordination scheme. * @return The maximum STP as a TimeValue */ - private static TimeValue findMaxSTP( + private static TimeValue findMaxSTAA( FedConnectionInstance connection, CoordinationMode coordination) { Variable port = connection.getDestinationPortInstance().getDefinition(); FederateInstance instance = connection.dstFederate; diff --git a/core/src/main/java/org/lflang/federated/generator/FederateInstance.java b/core/src/main/java/org/lflang/federated/generator/FederateInstance.java index fdfa6c11bb..30cdcf68f7 100644 --- a/core/src/main/java/org/lflang/federated/generator/FederateInstance.java +++ b/core/src/main/java/org/lflang/federated/generator/FederateInstance.java @@ -269,11 +269,11 @@ public Instantiation getInstantiation() { /** Keep a unique list of enabled serializers */ public List staaOffsets = new ArrayList<>(); - /** The STP offsets that have been recorded in {@code stpOffsets thus far. */ - public Set currentSTPOffsets = new HashSet<>(); + /** The STA offsets that have been recorded thus far. */ + public Set currentSTAOffsets = new HashSet<>(); /** Keep a map of STP values to a list of network actions */ - public HashMap> stpToNetworkActionMap = new HashMap<>(); + public HashMap> staToNetworkActionMap = new HashMap<>(); /** Keep a map of network actions to their associated instantiations */ public HashMap networkActionToInstantiation = new HashMap<>(); diff --git a/core/src/main/java/org/lflang/validation/LFValidator.java b/core/src/main/java/org/lflang/validation/LFValidator.java index 0e245be2db..4c5f87fc6a 100644 --- a/core/src/main/java/org/lflang/validation/LFValidator.java +++ b/core/src/main/java/org/lflang/validation/LFValidator.java @@ -101,7 +101,6 @@ import org.lflang.lf.Reaction; import org.lflang.lf.Reactor; import org.lflang.lf.ReactorDecl; -import org.lflang.lf.STP; import org.lflang.lf.Serializer; import org.lflang.lf.StateVar; import org.lflang.lf.TargetDecl; @@ -1036,15 +1035,6 @@ public void checkState(StateVar stateVar) { } } - @Check(CheckType.FAST) - public void checkSTPOffset(STP stp) { - if (isCBasedTarget() && this.info.overflowingSTP.contains(stp)) { - error( - "STP offset exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds.", - Literals.STP__VALUE); - } - } - @Check(CheckType.FAST) public void checkTargetDecl(TargetDecl target) throws IOException { Optional targetOpt = Target.forName(target.getName()); diff --git a/core/src/main/resources/lib/c/reactor-c b/core/src/main/resources/lib/c/reactor-c index 6ef9154791..89b0e3617d 160000 --- a/core/src/main/resources/lib/c/reactor-c +++ b/core/src/main/resources/lib/c/reactor-c @@ -1 +1 @@ -Subproject commit 6ef9154791ee9c4806f54c77d02f4c9a29420209 +Subproject commit 89b0e3617db1d2730552a418fd35f1395afe114a diff --git a/core/src/test/java/org/lflang/tests/compiler/LinguaFrancaValidationTest.java b/core/src/test/java/org/lflang/tests/compiler/LinguaFrancaValidationTest.java index 8a51d4a027..f13c07760b 100644 --- a/core/src/test/java/org/lflang/tests/compiler/LinguaFrancaValidationTest.java +++ b/core/src/test/java/org/lflang/tests/compiler/LinguaFrancaValidationTest.java @@ -1904,34 +1904,21 @@ public void testCppMutableInput() throws Exception { + "In C++, any value can be made mutable by calling get_mutable_copy()."); } - @Test - public void testOverflowingSTP() throws Exception { - String testCase = - """ - target C; - main reactor { - reaction(startup) {==} STP(2147483648) {==} - } - """; - - // TODO: Uncomment and fix failing test. See issue #903 on Github. - // validator.assertError(parseWithoutError(testCase), LfPackage.eINSTANCE.getSTP(), null, - // "STP offset exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds."); - } - @Test public void testOverflowingDeadline() throws Exception { String testCase = """ target C; main reactor { - reaction(startup) {==} STP(2147483648) {==} + reaction(startup) {==} deadline (1 week) {==} } """; - // TODO: Uncomment and fix failing test. See issue #903 on Github. - // validator.assertError(parseWithoutError(testCase), LfPackage.eINSTANCE.getDeadline(), null, - // "Deadline exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds."); + validator.assertError( + parseWithoutError(testCase), + LfPackage.eINSTANCE.getDeadline(), + null, + "Deadline exceeds the maximum of " + TimeValue.MAX_LONG_DEADLINE + " nanoseconds."); } @Test diff --git a/test/C/src/federated/Dataflow.lf b/test/C/src/federated/Dataflow.lf new file mode 100644 index 0000000000..4445789cb0 --- /dev/null +++ b/test/C/src/federated/Dataflow.lf @@ -0,0 +1,64 @@ +target C { + coordination: decentralized // logging: debug +} + +reactor Client(STP_offset: time = 1 day) { + input server_message: int + output client_message: int + + reaction(startup) {= + lf_print("Client Startup!"); + =} + + reaction(server_message) -> client_message {= + int val = server_message->value + 1; + lf_sleep(MSEC(100)); + lf_print("client: %d", val); + if (val == 9) { + lf_print("client requesting stop"); + lf_request_stop(); + } + lf_set(client_message, val); + =} STP(0) {= + // Zero STAA because STA is large and gets added. + lf_print_error_and_exit("Client STP Violated!"); + =} +} + +reactor Server(STP_offset: time = 1 day) { + output server_message: int + input client_message1: int + input client_message2: int + + reaction(startup) -> server_message {= + lf_print("Server Startup!"); + lf_set(server_message, 0); + =} + + reaction(client_message1, client_message2) -> server_message {= + int val = client_message1->value; + if (val < client_message2->value) val = client_message2->value; + lf_sleep(MSEC(100)); + val += 1; + lf_print("server: %d", val); + if (val == 8) { + lf_print("server requesting stop"); + lf_set(server_message, val); + lf_request_stop(); + } + lf_set(server_message, val); + =} STP(0) {= + // Zero STAA because STA is large and gets added. + lf_print_error_and_exit("Server STP Violated!"); + =} +} + +federated reactor { + client1 = new Client() + client2 = new Client() + server = new Server() + server.server_message -> client1.server_message + client1.client_message -> server.client_message1 after 0 + server.server_message -> client2.server_message + client2.client_message -> server.client_message2 after 0 +} diff --git a/test/C/src/federated/DecentralizedLagging.lf b/test/C/src/federated/DecentralizedLagging.lf new file mode 100644 index 0000000000..29d9b790bd --- /dev/null +++ b/test/C/src/federated/DecentralizedLagging.lf @@ -0,0 +1,68 @@ +/** + * @brief A federated system with a decentralized coordinator that has large STA and STAA offsets. + * + * This test verifies that a large STA and STAA offset can be used when the data flow is predictable + * even if the the program is lagging behind physical time. + * + * @author Edward A. Lee + */ +target C { + coordination: decentralized, + timeout: 40 ns +} + +// Pi needs an STP_offset because its event queue is usually empty and otherwise it will advance to the stop time. +reactor Pi(STP_offset: time = 1 day) { + input trigger: bool + output out: int + + reaction(trigger) -> out {= + tag_t now = lf_tag(); + lf_print("***** at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep); + lf_set(out, 42); + =} STP(30 s) {= + tag_t now = lf_tag(); + lf_print_error_and_exit("STP violation at Pi at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep); + =} +} + +// Gather doesn't need an STP_offset because its event queue is never empty. +reactor Gather { + input[4] in: int + output next: bool + logical action a(10 ns) + state count: int = 0 + + reaction(startup, a) -> next {= + lf_set(next, true); + =} + + reaction(in) -> a {= + tag_t now = lf_tag(); + for (int i = 0; i < 4; i++) { + if (!in[i]->is_present) { + lf_print_error_and_exit("Missing input %d in Gather at tag " PRINTF_TAG, + i, now.time - lf_time_start(), now.microstep); + } + } + lf_print("%d: at tag " PRINTF_TAG, self->count, now.time - lf_time_start(), now.microstep); + self->count++; + lf_schedule(a, 0); + =} STP(1 day) {= + tag_t now = lf_tag(); + lf_print_error_and_exit("STP violation at Gather at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep); + =} + + reaction(shutdown) {= + if (self->count < 5) { + lf_print_error_and_exit("Gather received only %d inputs. Expected 5.", self->count); + } + =} +} + +federated reactor { + pi = new[4] Pi() + g = new Gather() + pi.out -> g.in + (g.next)+ -> pi.trigger +} diff --git a/test/C/src/federated/DecentralizedMicrosteps.lf b/test/C/src/federated/DecentralizedMicrosteps.lf new file mode 100644 index 0000000000..09413e27c7 --- /dev/null +++ b/test/C/src/federated/DecentralizedMicrosteps.lf @@ -0,0 +1,71 @@ +/** + * @brief A federated system with a decentralized coordinator that has large STA and STAA offsets. + * + * This test verifies that a large STA and STAA offset can be used when the data flow is predictable + * even if the the program is lagging behind physical time. This version uses microsteps and never + * advances past the start time. It also uses lf_request_stop() to stop the execution. + * + * @author Edward A. Lee + */ +target C { + coordination: decentralized +} + +// Pi needs an STP_offset because it's event queue is usually empty and otherwise it will advance to the stop time. +reactor Pi(STP_offset: time = 30 s) { + input trigger: bool + output out: int + + reaction(trigger) -> out {= + tag_t now = lf_tag(); + lf_print("***** at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep); + lf_set(out, 42); + =} STP(30 s) {= + tag_t now = lf_tag(); + lf_print_error_and_exit("STP violation at Pi at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep); + =} +} + +// Gather doesn't need an STP_offset because its event queue is never empty. +reactor Gather { + input[4] in: int + output next: bool + logical action a + state count: int = 0 + + reaction(startup, a) -> next {= + lf_set(next, true); + =} + + reaction(in) -> a {= + tag_t now = lf_tag(); + for (int i = 0; i < 4; i++) { + if (!in[i]->is_present) { + lf_print_error_and_exit("Missing input %d in Gather at tag " PRINTF_TAG, + i, now.time - lf_time_start(), now.microstep); + } + } + lf_print("%d: at tag " PRINTF_TAG, self->count, now.time - lf_time_start(), now.microstep); + self->count++; + if (self->count >= 4) { + lf_request_stop(); + } + lf_schedule(a, 0); + =} STP(30 s) {= + tag_t now = lf_tag(); + lf_print_error_and_exit("STP violation at Gather at tag " PRINTF_TAG, now.time - lf_time_start(), now.microstep); + =} + + reaction(shutdown) {= + if (self->count < 5) { + lf_print_error_and_exit("Gather received only %d inputs. Expected at least 5.", self->count); + } + =} +} + +federated reactor { + pi = new[4] Pi() + g = new Gather() + pi.out -> g.in + (g.next)+ -> pi.trigger +} diff --git a/test/C/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf b/test/C/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf index 4ab321eeef..8bc7e9648f 100644 --- a/test/C/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf +++ b/test/C/src/federated/DecentralizedP2PUnbalancedTimeoutPhysical.lf @@ -27,7 +27,9 @@ reactor Clock(offset: time = 0, period: time = 1 sec) { =} } -reactor Destination { +// This needs an STA offset because otherwise it may advance to the stop time while the +// physical connection is in the process of assigning a timestamp. +reactor Destination(STA: time = 1 ms) { input x: int state s: int = 1 diff --git a/test/Python/src/federated/Dataflow.lf b/test/Python/src/federated/Dataflow.lf index 596b18f021..4d33235af2 100644 --- a/test/Python/src/federated/Dataflow.lf +++ b/test/Python/src/federated/Dataflow.lf @@ -22,8 +22,8 @@ reactor Client(STP_offset = {= FOREVER =}) { if val==49: print("client done") request_stop() - if val<49: - client_message.set(val) + # Need to unconditionally produce output or downstream could lock up waiting for it. + client_message.set(val) =} STP(10 s) {= print("Client STP Violated!") exit(1) @@ -49,8 +49,8 @@ reactor Server(STP_offset = {= FOREVER =}) { print("server done") server_message.set(val) request_stop() - if val<48: - server_message.set(val) + # Need to unconditionally produce output or downstream could lock up waiting for it. + server_message.set(val) =} STP(10 s) {= print("Server STP Violated!") exit(1)