From 118d5a4bcf5371fb215e0f7393fadf26073643f8 Mon Sep 17 00:00:00 2001 From: Denes Arvay Date: Wed, 17 Aug 2016 11:00:16 -0700 Subject: [PATCH] FLUME-2844. SpillableMemoryChannel must start ChannelCounter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewed by Bessenyei Balázs Donát (Denes Arvay via Mike Percy) --- .../java/org/apache/flume/channel/SpillableMemoryChannel.java | 1 + .../org/apache/flume/channel/TestSpillableMemoryChannel.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java index b46d6469fc..09d7f295b2 100644 --- a/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java +++ b/flume-ng-channels/flume-spillable-memory-channel/src/main/java/org/apache/flume/channel/SpillableMemoryChannel.java @@ -790,6 +790,7 @@ public synchronized void start() { drainOrder.putOverflow(overFlowCount); totalStored.release(overFlowCount); } + channelCounter.start(); int totalCount = overFlowCount + memQueue.size(); channelCounter.setChannelCapacity(memoryCapacity + getOverflowCapacity()); channelCounter.setChannelSize(totalCount); diff --git a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java index 848636b766..ab90c3d3ef 100644 --- a/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java +++ b/flume-ng-channels/flume-spillable-memory-channel/src/test/java/org/apache/flume/channel/TestSpillableMemoryChannel.java @@ -682,6 +682,9 @@ public void testCounters() throws InterruptedException { params.put("overflowTimeout", "0"); startChannel(params); + Assert.assertTrue("channel.channelCounter should have started", + channel.channelCounter.getStartTime() > 0); + //1. fill up mem queue Thread sourceThd = makePutThread("src", 1, 5000, 2500, channel); sourceThd.start();