From de2bfc7f7942908222ebcbac17e6072055acc062 Mon Sep 17 00:00:00 2001 From: AsamK Date: Mon, 31 Oct 2022 11:17:52 +0100 Subject: [PATCH] Add optional message limit for receive command --- .../org/asamk/signal/manager/Manager.java | 9 ++-- .../org/asamk/signal/manager/ManagerImpl.java | 17 ++++--- .../signal/manager/helper/ReceiveHelper.java | 13 ++++-- man/signal-cli.1.adoc | 3 ++ .../asamk/signal/commands/ReceiveCommand.java | 14 +++--- .../asamk/signal/dbus/DbusManagerImpl.java | 44 +++++++++++-------- 6 files changed, 58 insertions(+), 42 deletions(-) diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index 3dcf8f5976..ec168a4c34 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -202,12 +202,9 @@ default void addReceiveHandler(ReceiveMessageHandler handler) { /** * Receive new messages from server, returns if no new message arrive in a timespan of timeout. */ - void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException; - - /** - * Receive new messages from server, returns only if the thread is interrupted. - */ - void receiveMessages(ReceiveMessageHandler handler) throws IOException; + public void receiveMessages( + Optional timeout, Optional maxMessages, ReceiveMessageHandler handler + ) throws IOException; void setReceiveConfig(ReceiveConfig receiveConfig); diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 4ffeb99fdd..95f5bde4e7 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -961,17 +961,16 @@ public boolean isReceiving() { } @Override - public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException { - receiveMessages(timeout, true, handler); - } - - @Override - public void receiveMessages(ReceiveMessageHandler handler) throws IOException { - receiveMessages(Duration.ofMinutes(1), false, handler); + public void receiveMessages( + Optional timeout, + Optional maxMessages, + ReceiveMessageHandler handler + ) throws IOException { + receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); } private void receiveMessages( - Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler + Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler ) throws IOException { if (isReceiving()) { throw new IllegalStateException("Already receiving message."); @@ -979,7 +978,7 @@ private void receiveMessages( isReceivingSynchronous = true; receiveThread = Thread.currentThread(); try { - context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler); + context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, handler); } finally { receiveThread = null; isReceivingSynchronous = false; diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java index 9fe1bf5458..c15f4f94cf 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java @@ -80,7 +80,7 @@ public boolean requestStopReceiveMessages() { public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) { while (!shouldStop) { try { - receiveMessages(Duration.ofMinutes(1), false, handler); + receiveMessages(Duration.ofMinutes(1), false, null, handler); break; } catch (IOException e) { logger.warn("Receiving messages failed, retrying", e); @@ -89,7 +89,7 @@ public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) { } public void receiveMessages( - Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler + Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler ) throws IOException { needsToRetryFailedMessages = true; hasCaughtUpWithOldMessages = false; @@ -107,7 +107,7 @@ public void receiveMessages( signalWebSocket.connect(); try { - receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, handler, queuedActions); + receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions); } finally { hasCaughtUpWithOldMessages = false; handleQueuedActions(queuedActions.keySet()); @@ -122,13 +122,15 @@ private void receiveMessagesInternal( final SignalWebSocket signalWebSocket, Duration timeout, boolean returnOnTimeout, + Integer maxMessages, Manager.ReceiveMessageHandler handler, final Map queuedActions ) throws IOException { + int remainingMessages = maxMessages == null ? -1 : maxMessages; var backOffCounter = 0; isWaitingForMessage = false; - while (!shouldStop) { + while (!shouldStop && remainingMessages != 0) { if (needsToRetryFailedMessages) { retryFailedReceivedMessages(handler); needsToRetryFailedMessages = false; @@ -154,6 +156,9 @@ private void receiveMessagesInternal( backOffCounter = 0; if (result.isPresent()) { + if (remainingMessages > 0) { + remainingMessages -= 1; + } envelope = result.get(); logger.debug("New message received from server"); } else { diff --git a/man/signal-cli.1.adoc b/man/signal-cli.1.adoc index 5b3af7c1df..4d34582aa8 100644 --- a/man/signal-cli.1.adoc +++ b/man/signal-cli.1.adoc @@ -372,6 +372,9 @@ In json mode this is outputted as one json object per line. Number of seconds to wait for new messages (negative values disable timeout). Default is 5 seconds. +*--max-messages*:: +Maximum number of messages to receive, before returning. + *--ignore-attachments*:: Don’t download attachments of received messages. diff --git a/src/main/java/org/asamk/signal/commands/ReceiveCommand.java b/src/main/java/org/asamk/signal/commands/ReceiveCommand.java index 4d5bdff079..0095e75871 100644 --- a/src/main/java/org/asamk/signal/commands/ReceiveCommand.java +++ b/src/main/java/org/asamk/signal/commands/ReceiveCommand.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Optional; public class ReceiveCommand implements LocalCommand { @@ -37,6 +38,10 @@ public void attachToSubparser(final Subparser subparser) { .type(double.class) .setDefault(3.0) .help("Number of seconds to wait for new messages (negative values disable timeout)"); + subparser.addArgument("--max-messages") + .type(int.class) + .setDefault(-1) + .help("Maximum number of messages to receive, before returning."); subparser.addArgument("--ignore-attachments") .help("Don’t download attachments of received messages.") .action(Arguments.storeTrue()); @@ -58,6 +63,7 @@ public void handleCommand( final Namespace ns, final Manager m, final OutputWriter outputWriter ) throws CommandException { final var timeout = ns.getDouble("timeout"); + final var maxMessagesRaw = ns.getInt("max-messages"); final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories")); final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts")); @@ -65,11 +71,9 @@ public void handleCommand( try { final var handler = outputWriter instanceof JsonWriter ? new JsonReceiveMessageHandler(m, (JsonWriter) outputWriter) : new ReceiveMessageHandler(m, (PlainTextWriter) outputWriter); - if (timeout < 0) { - m.receiveMessages(handler); - } else { - m.receiveMessages(Duration.ofMillis((long) (timeout * 1000)), handler); - } + final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000)); + final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw; + m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler); } catch (IOException e) { throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e); } diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index 8e92cdf4ab..b59be923d0 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -58,6 +58,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -497,39 +498,46 @@ public boolean isReceiving() { } } - @Override - public void receiveMessages(final ReceiveMessageHandler handler) throws IOException { - addReceiveHandler(handler); - try { - synchronized (this) { - this.wait(); - } - } catch (InterruptedException ignored) { - } - removeReceiveHandler(handler); - } - @Override public void receiveMessages( - final Duration timeout, final ReceiveMessageHandler handler + Optional timeout, Optional maxMessages, ReceiveMessageHandler handler ) throws IOException { + final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1)); final var lastMessage = new AtomicLong(System.currentTimeMillis()); + final var thread = Thread.currentThread(); final ReceiveMessageHandler receiveHandler = (envelope, e) -> { lastMessage.set(System.currentTimeMillis()); handler.handleMessage(envelope, e); + if (remainingMessages.get() > 0) { + if (remainingMessages.decrementAndGet() <= 0) { + remainingMessages.set(0); + thread.interrupt(); + } + } }; addReceiveHandler(receiveHandler); - while (true) { + if (timeout.isPresent()) { + while (remainingMessages.get() != 0) { + try { + final var passedTime = System.currentTimeMillis() - lastMessage.get(); + final var sleepTimeRemaining = timeout.get().toMillis() - passedTime; + if (sleepTimeRemaining < 0) { + break; + } + Thread.sleep(sleepTimeRemaining); + } catch (InterruptedException ignored) { + } + } + } else { try { - final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get()); - if (sleepTimeRemaining < 0) { - break; + synchronized (this) { + this.wait(); } - Thread.sleep(sleepTimeRemaining); } catch (InterruptedException ignored) { } } + removeReceiveHandler(receiveHandler); }