From f5e5dd551d853778b78429a537fb70e187765563 Mon Sep 17 00:00:00 2001 From: AsamK Date: Thu, 30 Dec 2021 22:44:38 +0100 Subject: [PATCH] Extract ReceiveHelper --- .../org/asamk/signal/manager/ManagerImpl.java | 254 ++------------- .../asamk/signal/manager/helper/Context.java | 5 + .../signal/manager/helper/ReceiveHelper.java | 288 ++++++++++++++++++ 3 files changed, 315 insertions(+), 232 deletions(-) create mode 100644 lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 1af99ff155..1f731ee897 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -16,7 +16,6 @@ */ package org.asamk.signal.manager; -import org.asamk.signal.manager.actions.HandleAction; import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Device; import org.asamk.signal.manager.api.Group; @@ -43,7 +42,6 @@ import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.identities.IdentityInfo; -import org.asamk.signal.manager.storage.messageCache.CachedMessage; import org.asamk.signal.manager.storage.recipients.Contact; import org.asamk.signal.manager.storage.recipients.Profile; import org.asamk.signal.manager.storage.recipients.RecipientAddress; @@ -59,7 +57,6 @@ import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalSessionLock; import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage; -import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage; import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage; import org.whispersystems.signalservice.api.push.ACI; @@ -67,8 +64,6 @@ import org.whispersystems.signalservice.api.util.DeviceNameUtil; import org.whispersystems.signalservice.api.util.InvalidNumberException; import org.whispersystems.signalservice.api.util.PhoneNumberFormatter; -import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; -import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import org.whispersystems.signalservice.internal.util.DynamicCredentialsProvider; import org.whispersystems.signalservice.internal.util.Hex; import org.whispersystems.signalservice.internal.util.Util; @@ -81,7 +76,6 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -91,14 +85,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.schedulers.Schedulers; - import static org.asamk.signal.manager.config.ServiceConfig.capabilities; public class ManagerImpl implements Manager { @@ -113,15 +103,11 @@ public class ManagerImpl implements Manager { private final Context context; - private boolean hasCaughtUpWithOldMessages = false; - private boolean ignoreAttachments = false; - private Thread receiveThread; private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); private final List closedListeners = new ArrayList<>(); private boolean isReceivingSynchronous; - private boolean needsToRetryFailedMessages = false; ManagerImpl( SignalAccount account, @@ -155,6 +141,18 @@ public Lock acquire() { final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath()); this.context = new Context(account, dependencies, avatarStore, attachmentStore, stickerPackStore); + this.context.getReceiveHelper().setAuthenticationFailureListener(() -> { + try { + close(); + } catch (IOException e) { + logger.warn("Failed to close account after authentication failure", e); + } + }); + this.context.getReceiveHelper().setCaughtUpWithOldMessagesListener(() -> { + synchronized (this) { + this.notifyAll(); + } + }); } @Override @@ -257,7 +255,7 @@ public Configuration getConfiguration() { @Override public void updateConfiguration( Configuration configuration - ) throws IOException, NotMasterDeviceException { + ) throws NotMasterDeviceException { if (!account.isMasterDevice()) { throw new NotMasterDeviceException(); } @@ -762,7 +760,7 @@ public void setContactBlocked( @Override public void setGroupBlocked( final GroupId groupId, final boolean blocked - ) throws GroupNotFoundException, IOException, NotMasterDeviceException { + ) throws GroupNotFoundException, NotMasterDeviceException { if (!account.isMasterDevice()) { throw new NotMasterDeviceException(); } @@ -832,54 +830,6 @@ void retrieveRemoteStorage() throws IOException { } } - private void retryFailedReceivedMessages(ReceiveMessageHandler handler) { - Set queuedActions = new HashSet<>(); - for (var cachedMessage : account.getMessageCache().getCachedMessages()) { - var actions = retryFailedReceivedMessage(handler, cachedMessage); - if (actions != null) { - queuedActions.addAll(actions); - } - } - handleQueuedActions(queuedActions); - } - - private List retryFailedReceivedMessage( - final ReceiveMessageHandler handler, final CachedMessage cachedMessage - ) { - var envelope = cachedMessage.loadEnvelope(); - if (envelope == null) { - cachedMessage.delete(); - return null; - } - - final var result = context.getIncomingMessageHandler() - .handleRetryEnvelope(envelope, ignoreAttachments, handler); - final var actions = result.first(); - final var exception = result.second(); - - if (exception instanceof UntrustedIdentityException) { - if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) { - // Envelope is more than a month old, cleaning up. - cachedMessage.delete(); - return null; - } - if (!envelope.hasSourceUuid()) { - final var identifier = ((UntrustedIdentityException) exception).getSender(); - final var recipientId = account.getRecipientStore().resolveRecipient(identifier); - try { - account.getMessageCache().replaceSender(cachedMessage, recipientId); - } catch (IOException ioException) { - logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage()); - } - } - return null; - } - - // If successful and for all other errors that are not recoverable, delete the cached message - cachedMessage.delete(); - return actions; - } - @Override public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { if (isReceivingSynchronous) { @@ -903,7 +853,7 @@ private void startReceiveThreadIfRequired() { logger.debug("Starting receiving messages"); while (!Thread.interrupted()) { try { - receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> { + context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> { synchronized (messageHandlers) { Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { try { @@ -920,7 +870,6 @@ private void startReceiveThreadIfRequired() { } } logger.debug("Finished receiving messages"); - hasCaughtUpWithOldMessages = false; synchronized (messageHandlers) { receiveThread = null; @@ -988,180 +937,21 @@ private void receiveMessages( isReceivingSynchronous = true; receiveThread = Thread.currentThread(); try { - receiveMessagesInternal(timeout, returnOnTimeout, handler); + context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler); } finally { receiveThread = null; - hasCaughtUpWithOldMessages = false; isReceivingSynchronous = false; } } - private void receiveMessagesInternal( - Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler - ) throws IOException { - needsToRetryFailedMessages = true; - - // Use a Map here because java Set doesn't have a get method ... - Map queuedActions = new HashMap<>(); - - final var signalWebSocket = dependencies.getSignalWebSocket(); - final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(), - signalWebSocket.getWebSocketState()) - .subscribeOn(Schedulers.computation()) - .observeOn(Schedulers.computation()) - .distinctUntilChanged() - .subscribe(this::onWebSocketStateChange); - signalWebSocket.connect(); - - hasCaughtUpWithOldMessages = false; - var backOffCounter = 0; - final var MAX_BACKOFF_COUNTER = 9; - - while (!Thread.interrupted()) { - if (needsToRetryFailedMessages) { - retryFailedReceivedMessages(handler); - needsToRetryFailedMessages = false; - } - SignalServiceEnvelope envelope; - final CachedMessage[] cachedMessage = {null}; - final var nowMillis = System.currentTimeMillis(); - if (nowMillis - account.getLastReceiveTimestamp() > 60000) { - account.setLastReceiveTimestamp(nowMillis); - } - logger.debug("Checking for new message from server"); - try { - var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> { - final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore() - .resolveRecipient(envelope1.getSourceAddress()) : null; - // store message on disk, before acknowledging receipt to the server - cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId); - }); - backOffCounter = 0; - - if (result.isPresent()) { - envelope = result.get(); - logger.debug("New message received from server"); - } else { - logger.debug("Received indicator that server queue is empty"); - handleQueuedActions(queuedActions.keySet()); - queuedActions.clear(); - - hasCaughtUpWithOldMessages = true; - synchronized (this) { - this.notifyAll(); - } - - // Continue to wait another timeout for new messages - continue; - } - } catch (AssertionError e) { - if (e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); - break; - } else { - throw e; - } - } catch (IOException e) { - logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage()); - if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) { - final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter); - backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER); - logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds); - try { - Thread.sleep(sleepMilliseconds); - } catch (InterruptedException interruptedException) { - return; - } - hasCaughtUpWithOldMessages = false; - signalWebSocket.connect(); - continue; - } - throw e; - } catch (TimeoutException e) { - backOffCounter = 0; - if (returnOnTimeout) return; - continue; - } - - final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler); - for (final var h : result.first()) { - final var existingAction = queuedActions.get(h); - if (existingAction == null) { - queuedActions.put(h, h); - } else { - existingAction.mergeOther(h); - } - } - final var exception = result.second(); - - if (hasCaughtUpWithOldMessages) { - handleQueuedActions(queuedActions.keySet()); - queuedActions.clear(); - } - if (cachedMessage[0] != null) { - if (exception instanceof UntrustedIdentityException) { - logger.debug("Keeping message with untrusted identity in message cache"); - final var address = ((UntrustedIdentityException) exception).getSender(); - final var recipientId = account.getRecipientStore().resolveRecipient(address); - if (!envelope.hasSourceUuid()) { - try { - cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId); - } catch (IOException ioException) { - logger.warn("Failed to move cached message to recipient folder: {}", - ioException.getMessage()); - } - } - } else { - cachedMessage[0].delete(); - } - } - } - handleQueuedActions(queuedActions.keySet()); - queuedActions.clear(); - dependencies.getSignalWebSocket().disconnect(); - webSocketStateDisposable.dispose(); - } - - private void onWebSocketStateChange(final WebSocketConnectionState s) { - if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) { - account.setRegistered(false); - try { - close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - @Override public void setIgnoreAttachments(final boolean ignoreAttachments) { - this.ignoreAttachments = ignoreAttachments; + context.getReceiveHelper().setIgnoreAttachments(ignoreAttachments); } @Override public boolean hasCaughtUpWithOldMessages() { - return hasCaughtUpWithOldMessages; - } - - private void handleQueuedActions(final Collection queuedActions) { - logger.debug("Handling message actions"); - var interrupted = false; - for (var action : queuedActions) { - logger.debug("Executing action {}", action.getClass().getSimpleName()); - try { - action.execute(context); - } catch (Throwable e) { - if ((e instanceof AssertionError || e instanceof RuntimeException) - && e.getCause() instanceof InterruptedException) { - interrupted = true; - continue; - } - logger.warn("Message action failed.", e); - } - } - if (interrupted) { - Thread.currentThread().interrupt(); - } + return context.getReceiveHelper().hasCaughtUpWithOldMessages(); } @Override @@ -1268,7 +1058,7 @@ public boolean trustIdentityVerified( } final var updated = context.getIdentityHelper().trustIdentityVerified(recipientId, fingerprint); if (updated && this.isReceiving()) { - needsToRetryFailedMessages = true; + context.getReceiveHelper().setNeedsToRetryFailedMessages(true); } return updated; } @@ -1291,7 +1081,7 @@ public boolean trustIdentityVerifiedSafetyNumber( } final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); if (updated && this.isReceiving()) { - needsToRetryFailedMessages = true; + context.getReceiveHelper().setNeedsToRetryFailedMessages(true); } return updated; } @@ -1314,7 +1104,7 @@ public boolean trustIdentityVerifiedSafetyNumber( } final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); if (updated && this.isReceiving()) { - needsToRetryFailedMessages = true; + context.getReceiveHelper().setNeedsToRetryFailedMessages(true); } return updated; } @@ -1334,7 +1124,7 @@ public boolean trustIdentityAllKeys(RecipientIdentifier.Single recipient) throws } final var updated = context.getIdentityHelper().trustIdentityAllKeys(recipientId); if (updated && this.isReceiving()) { - needsToRetryFailedMessages = true; + context.getReceiveHelper().setNeedsToRetryFailedMessages(true); } return updated; } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/Context.java b/lib/src/main/java/org/asamk/signal/manager/helper/Context.java index 155bd4433b..79b7b95911 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/Context.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/Context.java @@ -29,6 +29,7 @@ public class Context { private PinHelper pinHelper; private PreKeyHelper preKeyHelper; private ProfileHelper profileHelper; + private ReceiveHelper receiveHelper; private RecipientHelper recipientHelper; private SendHelper sendHelper; private StorageHelper storageHelper; @@ -111,6 +112,10 @@ public ProfileHelper getProfileHelper() { return getOrCreate(() -> profileHelper, () -> profileHelper = new ProfileHelper(this)); } + public ReceiveHelper getReceiveHelper() { + return getOrCreate(() -> receiveHelper, () -> receiveHelper = new ReceiveHelper(this)); + } + public RecipientHelper getRecipientHelper() { return getOrCreate(() -> recipientHelper, () -> recipientHelper = new RecipientHelper(this)); } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java new file mode 100644 index 0000000000..631e271560 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java @@ -0,0 +1,288 @@ +package org.asamk.signal.manager.helper; + +import org.asamk.signal.manager.Manager; +import org.asamk.signal.manager.SignalDependencies; +import org.asamk.signal.manager.UntrustedIdentityException; +import org.asamk.signal.manager.actions.HandleAction; +import org.asamk.signal.manager.storage.SignalAccount; +import org.asamk.signal.manager.storage.messageCache.CachedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; +import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.Schedulers; + +public class ReceiveHelper { + + private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class); + private final static int MAX_BACKOFF_COUNTER = 9; + + private final SignalAccount account; + private final SignalDependencies dependencies; + private final Context context; + + private boolean ignoreAttachments = false; + private boolean needsToRetryFailedMessages = false; + private boolean hasCaughtUpWithOldMessages = false; + private Callable authenticationFailureListener; + private Callable caughtUpWithOldMessagesListener; + + public ReceiveHelper(final Context context) { + this.account = context.getAccount(); + this.dependencies = context.getDependencies(); + this.context = context; + } + + public void setIgnoreAttachments(final boolean ignoreAttachments) { + this.ignoreAttachments = ignoreAttachments; + } + + public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) { + this.needsToRetryFailedMessages = needsToRetryFailedMessages; + } + + public boolean hasCaughtUpWithOldMessages() { + return hasCaughtUpWithOldMessages; + } + + public void setAuthenticationFailureListener(final Callable authenticationFailureListener) { + this.authenticationFailureListener = authenticationFailureListener; + } + + public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) { + this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener; + } + + public void receiveMessages( + Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler + ) throws IOException { + needsToRetryFailedMessages = true; + hasCaughtUpWithOldMessages = false; + + // Use a Map here because java Set doesn't have a get method ... + Map queuedActions = new HashMap<>(); + + final var signalWebSocket = dependencies.getSignalWebSocket(); + final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(), + signalWebSocket.getWebSocketState()) + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(this::onWebSocketStateChange); + signalWebSocket.connect(); + + try { + receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions); + } finally { + hasCaughtUpWithOldMessages = false; + handleQueuedActions(queuedActions.keySet()); + queuedActions.clear(); + dependencies.getSignalWebSocket().disconnect(); + webSocketStateDisposable.dispose(); + } + } + + private void receiveMessagesInternal( + Duration timeout, + boolean returnOnTimeout, + Manager.ReceiveMessageHandler handler, + final Map queuedActions + ) throws IOException { + final var signalWebSocket = dependencies.getSignalWebSocket(); + + var backOffCounter = 0; + + while (!Thread.interrupted()) { + if (needsToRetryFailedMessages) { + retryFailedReceivedMessages(handler); + needsToRetryFailedMessages = false; + } + SignalServiceEnvelope envelope; + final CachedMessage[] cachedMessage = {null}; + final var nowMillis = System.currentTimeMillis(); + if (nowMillis - account.getLastReceiveTimestamp() > 60000) { + account.setLastReceiveTimestamp(nowMillis); + } + logger.debug("Checking for new message from server"); + try { + var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> { + final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore() + .resolveRecipient(envelope1.getSourceAddress()) : null; + logger.trace("Storing new message from {}", recipientId); + // store message on disk, before acknowledging receipt to the server + cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId); + }); + backOffCounter = 0; + + if (result.isPresent()) { + envelope = result.get(); + logger.debug("New message received from server"); + } else { + logger.debug("Received indicator that server queue is empty"); + handleQueuedActions(queuedActions.keySet()); + queuedActions.clear(); + + hasCaughtUpWithOldMessages = true; + caughtUpWithOldMessagesListener.call(); + + // Continue to wait another timeout for new messages + continue; + } + } catch (AssertionError e) { + if (e.getCause() instanceof InterruptedException) { + Thread.currentThread().interrupt(); + break; + } else { + throw e; + } + } catch (IOException e) { + logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage()); + if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) { + final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter); + backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER); + logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds); + try { + Thread.sleep(sleepMilliseconds); + } catch (InterruptedException interruptedException) { + return; + } + hasCaughtUpWithOldMessages = false; + signalWebSocket.connect(); + continue; + } + throw e; + } catch (TimeoutException e) { + backOffCounter = 0; + if (returnOnTimeout) return; + continue; + } + + final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler); + for (final var h : result.first()) { + final var existingAction = queuedActions.get(h); + if (existingAction == null) { + queuedActions.put(h, h); + } else { + existingAction.mergeOther(h); + } + } + final var exception = result.second(); + + if (hasCaughtUpWithOldMessages) { + handleQueuedActions(queuedActions.keySet()); + queuedActions.clear(); + } + if (cachedMessage[0] != null) { + if (exception instanceof UntrustedIdentityException) { + logger.debug("Keeping message with untrusted identity in message cache"); + final var address = ((UntrustedIdentityException) exception).getSender(); + final var recipientId = account.getRecipientStore().resolveRecipient(address); + if (!envelope.hasSourceUuid()) { + try { + cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId); + } catch (IOException ioException) { + logger.warn("Failed to move cached message to recipient folder: {}", + ioException.getMessage()); + } + } + } else { + cachedMessage[0].delete(); + } + } + } + } + + private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) { + Set queuedActions = new HashSet<>(); + for (var cachedMessage : account.getMessageCache().getCachedMessages()) { + var actions = retryFailedReceivedMessage(handler, cachedMessage); + if (actions != null) { + queuedActions.addAll(actions); + } + } + handleQueuedActions(queuedActions); + } + + private List retryFailedReceivedMessage( + final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage + ) { + var envelope = cachedMessage.loadEnvelope(); + if (envelope == null) { + cachedMessage.delete(); + return null; + } + + final var result = context.getIncomingMessageHandler() + .handleRetryEnvelope(envelope, ignoreAttachments, handler); + final var actions = result.first(); + final var exception = result.second(); + + if (exception instanceof UntrustedIdentityException) { + if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) { + // Envelope is more than a month old, cleaning up. + cachedMessage.delete(); + return null; + } + if (!envelope.hasSourceUuid()) { + final var identifier = ((UntrustedIdentityException) exception).getSender(); + final var recipientId = account.getRecipientStore().resolveRecipient(identifier); + try { + account.getMessageCache().replaceSender(cachedMessage, recipientId); + } catch (IOException ioException) { + logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage()); + } + } + return null; + } + + // If successful and for all other errors that are not recoverable, delete the cached message + cachedMessage.delete(); + return actions; + } + + private void handleQueuedActions(final Collection queuedActions) { + logger.debug("Handling message actions"); + var interrupted = false; + for (var action : queuedActions) { + logger.debug("Executing action {}", action.getClass().getSimpleName()); + try { + action.execute(context); + } catch (Throwable e) { + if ((e instanceof AssertionError || e instanceof RuntimeException) + && e.getCause() instanceof InterruptedException) { + interrupted = true; + continue; + } + logger.warn("Message action failed.", e); + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + + private void onWebSocketStateChange(final WebSocketConnectionState s) { + if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) { + account.setRegistered(false); + authenticationFailureListener.call(); + } + } + + public interface Callable { + + void call(); + } +}