diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index a246272c9039..d09fe068668e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -252,21 +252,28 @@ public void closeSenders() { * Close any senders that haven't been active with in the given threshold * * @param idleThreshold the threshold to consider a sender as idle + * @return the number of connections that were closed as a result of being idle */ - protected void pruneIdleSenders(final long idleThreshold) { + protected PruneResult pruneIdleSenders(final long idleThreshold) { + int numClosed = 0; + int numConsidered = 0; + long currentTime = System.currentTimeMillis(); final List putBack = new ArrayList<>(); // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { + numConsidered++; if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); + numClosed++; } else { putBack.add(sender); } } + // re-queue senders that weren't idle, but if the queue is full then close the sender for (ChannelSender putBackSender : putBack) { boolean returned = senderPool.offer(putBackSender); @@ -274,6 +281,8 @@ protected void pruneIdleSenders(final long idleThreshold) { putBackSender.close(); } } + + return new PruneResult(numClosed, numConsidered); } /** @@ -371,6 +380,31 @@ protected void relinquishSender(final ChannelSender sender) { } } + /** + * The results from pruning connections. + */ + protected static class PruneResult { + + private final int numClosed; + + private final int numConsidered; + + public PruneResult(final int numClosed, final int numConsidered) { + this.numClosed = numClosed; + this.numConsidered = numConsidered; + } + + public int getNumClosed() { + return numClosed; + } + + public int getNumConsidered() { + return numConsidered; + } + + } + + /** * Represents a range of messages from a FlowFile. */ diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 57ea812aaef1..d67419299f11 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -138,8 +138,11 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 412d8abc959e..63f17ba244db 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -265,20 +265,26 @@ public void onStopped() { } } - private void pruneIdleSenders(final long idleThreshold){ + private PruneResult pruneIdleSenders(final long idleThreshold){ + int numClosed = 0; + int numConsidered = 0; + long currentTime = System.currentTimeMillis(); final List putBack = new ArrayList<>(); // if a connection hasn't been used with in the threshold then it gets closed ChannelSender sender; while ((sender = senderPool.poll()) != null) { + numConsidered++; if (currentTime > (sender.getLastUsed() + idleThreshold)) { getLogger().debug("Closing idle connection..."); sender.close(); + numClosed++; } else { putBack.add(sender); } } + // re-queue senders that weren't idle, but if the queue is full then close the sender for (ChannelSender putBackSender : putBack) { boolean returned = senderPool.offer(putBackSender); @@ -286,6 +292,8 @@ private void pruneIdleSenders(final long idleThreshold){ putBackSender.close(); } } + + return new PruneResult(numClosed, numConsidered); } @Override @@ -295,8 +303,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final List flowFiles = session.get(batchSize); if (flowFiles == null || flowFiles.isEmpty()) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } @@ -394,4 +405,25 @@ private boolean isValid(final String message) { return false; } + private static class PruneResult { + + private final int numClosed; + + private final int numConsidered; + + public PruneResult(final int numClosed, final int numConsidered) { + this.numClosed = numClosed; + this.numConsidered = numConsidered; + } + + public int getNumClosed() { + return numClosed; + } + + public int getNumConsidered() { + return numConsidered; + } + + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java index ee3e645074d8..a8deab22a126 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java @@ -168,8 +168,11 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java index af23c5441cbf..3157930dd586 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java @@ -131,8 +131,11 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { - pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - context.yield(); + final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + // yield if we closed an idle connection, or if there were no connections in the first place + if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) { + context.yield(); + } return; }