Skip to content

Commit

Permalink
NIFI-4476 Improving logic for determining when to yield in PutTCP/UDP…
Browse files Browse the repository at this point in the history
…/Syslog/Splunk

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2204.
  • Loading branch information
bbende authored and pvillard31 committed Oct 10, 2017
1 parent 883c223 commit 9324a2a
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,28 +252,37 @@ public void closeSenders() {
* Close any senders that haven't been active with in the given threshold
*
* @param idleThreshold the threshold to consider a sender as idle
* @return the number of connections that were closed as a result of being idle
*/
protected void pruneIdleSenders(final long idleThreshold) {
protected PruneResult pruneIdleSenders(final long idleThreshold) {
int numClosed = 0;
int numConsidered = 0;

long currentTime = System.currentTimeMillis();
final List<ChannelSender> putBack = new ArrayList<>();

// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
numConsidered++;
if (currentTime > (sender.getLastUsed() + idleThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
numClosed++;
} else {
putBack.add(sender);
}
}

// re-queue senders that weren't idle, but if the queue is full then close the sender
for (ChannelSender putBackSender : putBack) {
boolean returned = senderPool.offer(putBackSender);
if (!returned) {
putBackSender.close();
}
}

return new PruneResult(numClosed, numConsidered);
}

/**
Expand Down Expand Up @@ -371,6 +380,31 @@ protected void relinquishSender(final ChannelSender sender) {
}
}

/**
* The results from pruning connections.
*/
protected static class PruneResult {

private final int numClosed;

private final int numConsidered;

public PruneResult(final int numClosed, final int numConsidered) {
this.numClosed = numClosed;
this.numConsidered = numConsidered;
}

public int getNumClosed() {
return numClosed;
}

public int getNumConsidered() {
return numConsidered;
}

}


/**
* Represents a range of messages from a FlowFile.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,11 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
// yield if we closed an idle connection, or if there were no connections in the first place
if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
context.yield();
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,27 +265,35 @@ public void onStopped() {
}
}

private void pruneIdleSenders(final long idleThreshold){
private PruneResult pruneIdleSenders(final long idleThreshold){
int numClosed = 0;
int numConsidered = 0;

long currentTime = System.currentTimeMillis();
final List<ChannelSender> putBack = new ArrayList<>();

// if a connection hasn't been used with in the threshold then it gets closed
ChannelSender sender;
while ((sender = senderPool.poll()) != null) {
numConsidered++;
if (currentTime > (sender.getLastUsed() + idleThreshold)) {
getLogger().debug("Closing idle connection...");
sender.close();
numClosed++;
} else {
putBack.add(sender);
}
}

// re-queue senders that weren't idle, but if the queue is full then close the sender
for (ChannelSender putBackSender : putBack) {
boolean returned = senderPool.offer(putBackSender);
if (!returned) {
putBackSender.close();
}
}

return new PruneResult(numClosed, numConsidered);
}

@Override
Expand All @@ -295,8 +303,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.isEmpty()) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
// yield if we closed an idle connection, or if there were no connections in the first place
if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
context.yield();
}
return;
}

Expand Down Expand Up @@ -394,4 +405,25 @@ private boolean isValid(final String message) {
return false;
}

private static class PruneResult {

private final int numClosed;

private final int numConsidered;

public PruneResult(final int numClosed, final int numConsidered) {
this.numClosed = numClosed;
this.numConsidered = numConsidered;
}

public int getNumClosed() {
return numClosed;
}

public int getNumConsidered() {
return numConsidered;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
// yield if we closed an idle connection, or if there were no connections in the first place
if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
context.yield();
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
final PruneResult result = pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
// yield if we closed an idle connection, or if there were no connections in the first place
if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && result.getNumConsidered() == 0)) {
context.yield();
}
return;
}

Expand Down

0 comments on commit 9324a2a

Please sign in to comment.