Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c9ea807
fix potential deadlock between actionDescriptionBuilderLock and actio…
DivineThreepwood Dec 11, 2022
196bdf8
make sure reschedule is not triggered once the unit controller is shu…
DivineThreepwood Dec 11, 2022
defea93
Make sure required state observer is shutdown in case the scene state…
DivineThreepwood Dec 11, 2022
3726dd7
fix logic issue in equalServiceStates check.
DivineThreepwood Dec 11, 2022
5db7a66
Implememnt validateInitialAction service method for Actions.
DivineThreepwood Dec 11, 2022
4022d5e
Make sure remote action cleanup task does not cleanup the state of ot…
DivineThreepwood Dec 11, 2022
942e460
make sure remote action list is always locked once accessed.
DivineThreepwood Dec 11, 2022
66e9eb6
Code readability improvements.
DivineThreepwood Dec 11, 2022
8937134
clearly define start and end of an bco test to enable the seperation …
DivineThreepwood Dec 11, 2022
0f84d7e
improve test logging.
DivineThreepwood Dec 11, 2022
5bbfc42
Merge remote-tracking branch 'origin/dev' into bugfix/#69_fix_action_…
DivineThreepwood Dec 11, 2022
43240e4
remove debug logging
DivineThreepwood Dec 11, 2022
36f2407
cleanup code
DivineThreepwood Dec 11, 2022
a25bbb6
log cleanup
DivineThreepwood Dec 11, 2022
f2cf075
Update module/dal/control/src/main/java/org/openbase/bco/dal/control/…
DivineThreepwood Dec 13, 2022
0f373f8
Apply suggestions from code review
DivineThreepwood Dec 13, 2022
08e70b1
switch unit test debug log from warn to debug
DivineThreepwood Dec 13, 2022
b515e7e
improve toString method of RemoteAction.
DivineThreepwood Dec 13, 2022
d5e13a5
update jul to get sl4j lambda support.
DivineThreepwood Dec 13, 2022
151593e
improve locking note in action impl and introduce lambda logging for …
DivineThreepwood Dec 13, 2022
d249c68
Merge branch 'bugfix/#69_fix_action_pipeline_deadlocks_and_other_pipe…
DivineThreepwood Dec 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public class ActionImpl implements SchedulableAction {

protected final AbstractUnitController<?, ?> unit;
private final SyncObject executionStateChangeSync = new SyncObject("ExecutionStateChangeSync");

// IMPORTANT LOCKING NOTE: There is no need to acquire the actionDescriptionBuilderLock in every case we need the action task exclusively.
// However, in case you acquire the action task lock and need to access the actionDescriptionBuilder within its synchronize scope,
// the actionDescriptionBuilderLock has to be acquired first before locking the action task.
// Otherwise, there is a high risk of deadlocks.
private final SyncObject actionTaskLock = new SyncObject("ActionTaskLock");
private final BundledReentrantReadWriteLock actionDescriptionBuilderLock;
private ActionDescription.Builder actionDescriptionBuilder;
Expand Down Expand Up @@ -742,10 +747,15 @@ private void cancelActionTask() {
}

private void updateActionStateIfNotTerminating(final ActionState.State state) throws InterruptedException {
synchronized (actionTaskLock) {
if (isTerminating()) {
throw new InterruptedException();
actionDescriptionBuilderLock.lockReadInterruptibly();
try {
synchronized (actionTaskLock) {
if (isTerminating()) {
throw new InterruptedException();
}
}
} finally {
actionDescriptionBuilderLock.unlockRead();
}
updateActionState(state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,7 +1720,9 @@ public void cancelAllActions() throws InterruptedException, CouldNotPerformExcep
}

// final reschedule for cleanup
reschedule();
if(!isShutdownInProgress()) {
reschedule();
}
} finally {
builderSetup.unlockWrite(NotificationStrategy.AFTER_LAST_RELEASE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.openbase.bco.dal.lib.layer.unit.UnitRemote;
import org.openbase.bco.dal.lib.layer.unit.scene.SceneController;
import org.openbase.bco.dal.lib.state.States.Activation;
import org.openbase.bco.dal.remote.action.Actions;
import org.openbase.bco.dal.remote.action.RemoteAction;
import org.openbase.bco.dal.remote.action.RemoteActionPool;
import org.openbase.bco.dal.remote.layer.unit.ButtonRemote;
Expand Down Expand Up @@ -256,6 +257,8 @@ private void stop() {

public class ActivationStateOperationServiceImpl implements ActivationStateOperationService {

private RequiredActionObserver actionObserver = null;

/**
* Sets the activation state of the scene
*
Expand All @@ -268,6 +271,11 @@ public synchronized Future<ActionDescription> setActivationState(final Activatio
final ActionDescription.Builder responsibleActionBuilder = activationState.getResponsibleAction().toBuilder();


// shutdown all existing action observer to not let old observation interfere with new activations.
if(actionObserver != null) {
actionObserver.shutdown();
}

// mark scene action as not replaceable since scene takes care of managing this actions.
responsibleActionBuilder.setReplaceable(false);

Expand Down Expand Up @@ -305,10 +313,10 @@ public synchronized Future<ActionDescription> setActivationState(final Activatio
requiredActionImpactList.addAll(requiredAction.getActionImpact(true));

} catch (org.openbase.jul.exception.TimeoutException ex) {
// if the timeout is exhausted than just continue since we want to keep on trying as long as the scene is active.
// if the timeout is exhausted then just continue since we want to keep on trying as long as the scene is active.
continue;
} catch (CancellationException | CouldNotPerformException ex) {
ExceptionPrinter.printHistory("Optional " + requiredAction + " of " + this + " could not executed!", ex, logger, LogLevel.WARN);
ExceptionPrinter.printHistory("Required " + requiredAction + " of " + getLabel("?") + " could not be executed!", ex, logger, LogLevel.DEBUG);
return FutureProcessor.canceledFuture(ActionDescription.class, new RejectedException("Required action " + requiredAction + " could not be executed", ex));
}
}
Expand All @@ -321,14 +329,13 @@ public synchronized Future<ActionDescription> setActivationState(final Activatio
// if the timeout is exhausted than just continue since we want to keep on trying as long as the scene is active.
continue;
} catch (CancellationException | CouldNotPerformException ex) {
ExceptionPrinter.printHistory("Optional " + optionalAction + " of " + this + " could not executed!", ex, logger, LogLevel.WARN);
ExceptionPrinter.printHistory("Optional " + optionalAction + " of " + getLabel("?") + " could not be executed!", ex, logger, LogLevel.TRACE);
}
}

// register an observer which will deactivate the scene if one required action is now longer running
// observer will cleanup itself after the action is no longer valid so no need to care fore it.
try {
new RequiredActionObserver(requiredActionImpactList, getActionById(responsibleActionBuilder.getActionId(), "SceneController"));
actionObserver = new RequiredActionObserver(requiredActionImpactList, getActionById(responsibleActionBuilder.getActionId(), "SceneController"));
} catch (NotAvailableException ex) {
new FatalImplementationErrorException("Action is not available even when just created!", this, ex);
}
Expand Down Expand Up @@ -370,7 +377,7 @@ public ServiceType getServiceType() {
}
}

public static class RequiredActionObserver implements Observer<ServiceStateProvider<Message>, Message>, Shutdownable {
class RequiredActionObserver implements Observer<ServiceStateProvider<Message>, Message>, Shutdownable {

private final Logger LOGGER = LoggerFactory.getLogger(RequiredActionObserver.class);

Expand All @@ -379,6 +386,8 @@ public static class RequiredActionObserver implements Observer<ServiceStateProvi
private final HashMap<UnitRemote<?>, RequiredServiceDescription> unitAndRequiredServiceStateMap;
private final Action responsibleAction;

private boolean destroy = false;

private RequiredActionObserver(final List<ActionReference> requiredActionImpact, final Action responsibleAction) {
this.responsibleAction = responsibleAction;
this.unitAndRequiredServiceStateMap = new HashMap<>();
Expand Down Expand Up @@ -427,6 +436,7 @@ private RequiredActionObserver(final List<ActionReference> requiredActionImpact,

private void verifyAllStates() {
try {
logger.trace(() -> "verify "+unitAndRequiredServiceStateMap.entrySet().size()+ " states of "+ getLabel("?"));
for (Entry<UnitRemote<? extends Message>, RequiredServiceDescription> unitActionReferenceEntry : unitAndRequiredServiceStateMap.entrySet()) {
try {
// skip unit in case its offline, since then the verification is automatically
Expand All @@ -446,12 +456,25 @@ private void verifyAllStates() {

private void verifyState(final ServiceProvider<? extends Message> unit, final Message serviceState) throws VerificationFailedException {

// skip verification on destroyed required action observer!
if(destroy) {
return;
}

if (!responsibleAction.isValid()) {
throw new VerificationFailedException("Action not valid anymore!");
throw new VerificationFailedException("The activation of " + getLabel("?") + " is not valid anymore.");
}

// skip in case no service state was delivered
if(serviceState.toString().isBlank()) {
return;
}

if (!Services.equalServiceStates(unitAndRequiredServiceStateMap.get(unit).getServiceState(), serviceState)) {
throw new VerificationFailedException("State of " + unit + "not meet!");
logger.trace(() -> unitAndRequiredServiceStateMap.get(unit).getServiceState() + " is not equals " + serviceState.toString().substring(0, 20) + " and will cancel: " + SceneControllerImpl.this.getLabel("?"));
if(Actions.validateInitialAction(serviceState)) {
throw new VerificationFailedException("State of " + unit + "not meet!");
}
}
}

Expand All @@ -477,6 +500,7 @@ public void update(ServiceStateProvider<Message> serviceStateProvider, Message s

@Override
public void shutdown() {
destroy = true;
// deregister observation
for (Entry<UnitRemote<?>, RequiredServiceDescription> unitActionReferenceEntry : unitAndRequiredServiceStateMap.entrySet()) {
unitActionReferenceEntry.getKey().removeServiceStateObserver(ServiceTempus.CURRENT, unitActionReferenceEntry.getValue().getServiceType(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ public static boolean equalServiceStates(final Message serviceState1, final Mess
}

// fail if only one is empty
if (serviceState1.equals(serviceState1.getDefaultInstanceForType()) || serviceState2.equals(serviceState2.getDefaultInstanceForType())) {
if (serviceState1.equals(serviceState1.getDefaultInstanceForType()) ^ serviceState2.equals(serviceState2.getDefaultInstanceForType())) {
return false;
}

Expand Down Expand Up @@ -1373,5 +1373,3 @@ public static String getServiceStateClassName(final Message serviceState) {
return serviceState.getClass().getName();
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
* #L%
*/

import com.google.protobuf.Message;
import org.openbase.bco.dal.lib.action.ActionDescriptionProcessor;
import org.openbase.bco.dal.lib.layer.service.ServiceStateProcessor;
import org.openbase.jul.exception.CouldNotPerformException;
import org.openbase.type.domotic.action.ActionDescriptionType.ActionDescription;
import org.openbase.type.domotic.state.ActionStateType.ActionState;
Expand Down Expand Up @@ -55,4 +58,27 @@ public static RemoteAction waitForActionState(final Future<ActionDescription> ac
remoteAction.waitForActionState(actionState, timeout, timeUnit);
return remoteAction;
}

public static boolean validateInitialAction(Message serviceState) {
try {
return validateInitialAction(ServiceStateProcessor.getResponsibleAction(serviceState));
} catch (CouldNotPerformException e) {
// skip validation in error case.
}
return false;
}

public static boolean validateInitialAction(ActionDescription actionDescription) {
try {
final RemoteAction initialActionOfIncomingServiceState =
new RemoteAction(ActionDescriptionProcessor.getInitialActionReference(actionDescription));
initialActionOfIncomingServiceState.waitForRegistration();
return initialActionOfIncomingServiceState.isValid();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CouldNotPerformException e) {
// skip if validation failed.
}
return false;
}
}
Loading