Skip to content

Commit

Permalink
Add metrics to CoordinatorEventBlockingQueue (#937)
Browse files Browse the repository at this point in the history
* gauge: queue size
* counter: duplicate event (event already enqueued)
  • Loading branch information
ehoner authored May 31, 2023
1 parent c449757 commit 6f59d2a
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public Coordinator(CachedDatastreamReader datastreamCache, CoordinatorConfig con
_heartbeatPeriod = Duration.ofMillis(config.getHeartbeatPeriodMs());

_adapter = createZkAdapter();
_eventQueue = new CoordinatorEventBlockingQueue();
_eventQueue = new CoordinatorEventBlockingQueue(Coordinator.class.getSimpleName());
createEventThread();

VerifiableProperties coordinatorProperties = new VerifiableProperties(_config.getConfigProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,91 @@
*/
package com.linkedin.datastream.server;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

import com.linkedin.datastream.metrics.BrooklinCounterInfo;
import com.linkedin.datastream.metrics.BrooklinGaugeInfo;
import com.linkedin.datastream.metrics.BrooklinMetricInfo;
import com.linkedin.datastream.metrics.DynamicMetricsManager;
import com.linkedin.datastream.metrics.MetricsAware;


/**
* A blocking queue for {@link Coordinator} events
* A blocking queue for {@link Coordinator} events. Includes two metrics, a
* {@link Gauge} and {@link Counter}. The gauge provides the queue size and
* the counter increments when duplicate events are {@code put()}.
*
* @see CoordinatorEvent.EventType
*/
public class CoordinatorEventBlockingQueue {
class CoordinatorEventBlockingQueue implements MetricsAware {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorEventBlockingQueue.class.getName());
private static final Set<BrooklinMetricInfo> METRIC_INFOS = ConcurrentHashMap.newKeySet();

static final String COUNTER_KEY = "duplicateEvents";
static final String GAUGE_KEY = "queuedEvents";

private final Set<CoordinatorEvent> _eventSet;
private final Queue<CoordinatorEvent> _eventQueue;
private final DynamicMetricsManager _dynamicMetricsManager;
private final Gauge<Integer> _gauge;
private final Counter _counter;

/**
* Construct a blocking event queue for all types of events in {@link CoordinatorEvent.EventType}
*
* @param key String used to register CoordinatorEventBlockQueue metrics. The metrics
* will be registered to {@code CoordinatorEventBlockingQueue.<key>.<metric>}.
* Where {@code <metric>} is either {@link CoordinatorEventBlockingQueue#COUNTER_KEY}
* or {@link CoordinatorEventBlockingQueue#GAUGE_KEY}.
*/
public CoordinatorEventBlockingQueue() {
CoordinatorEventBlockingQueue(String key) {
_eventSet = new HashSet<>();
_eventQueue = new LinkedBlockingQueue<>();
_dynamicMetricsManager = DynamicMetricsManager.getInstance();

String prefix = buildMetricName(key);
_counter = _dynamicMetricsManager.registerMetric(prefix, COUNTER_KEY, Counter.class);
_gauge = _dynamicMetricsManager.registerGauge(prefix, GAUGE_KEY, _eventQueue::size);

BrooklinCounterInfo counterInfo = new BrooklinCounterInfo(MetricRegistry.name(prefix, COUNTER_KEY));
BrooklinGaugeInfo gaugeInfo = new BrooklinGaugeInfo(MetricRegistry.name(prefix, GAUGE_KEY));
METRIC_INFOS.addAll(Arrays.asList(counterInfo, gaugeInfo));
}


/**
* Add a single event to the queue, overwriting events with the same name and same metadata.
* @param event CoordinatorEvent event to add to the queue
*/
public synchronized void put(CoordinatorEvent event) {
LOG.info("Queuing event {} to event queue", event.getType());
if (!_eventSet.contains(event)) {
if (_eventSet.contains(event)) {
_counter.inc(); // count duplicate event
} else {
// only insert if there isn't an event present in the queue with the same name and same metadata.
boolean result = _eventQueue.offer(event);
if (!result) {
return;
}
_eventSet.add(event);
_dynamicMetricsManager.setGauge(_gauge, _eventQueue::size);
}

LOG.debug("Event queue size {}", _eventQueue.size());
notify();
}
Expand Down Expand Up @@ -74,6 +118,7 @@ public synchronized CoordinatorEvent take() throws InterruptedException {
LOG.info("De-queuing event " + queuedEvent.getType());
LOG.debug("Event queue size: {}", _eventQueue.size());
_eventSet.remove(queuedEvent);
_dynamicMetricsManager.setGauge(_gauge, _eventQueue::size);
}

return queuedEvent;
Expand All @@ -85,6 +130,7 @@ public synchronized CoordinatorEvent take() throws InterruptedException {
public synchronized void clear() {
_eventQueue.clear();
_eventSet.clear();
_dynamicMetricsManager.setGauge(_gauge, _eventQueue::size);
}

/**
Expand Down Expand Up @@ -112,4 +158,9 @@ public int size() {
public boolean isEmpty() {
return _eventQueue.isEmpty();
}

@Override
public List<BrooklinMetricInfo> getMetricInfos() {
return new ArrayList<>(METRIC_INFOS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,42 @@
*/
package com.linkedin.datastream.server;

import java.util.Random;

import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

import com.linkedin.datastream.metrics.DynamicMetricsManager;
import com.linkedin.datastream.testutil.MetricsTestUtils;

import static com.linkedin.datastream.server.CoordinatorEventBlockingQueue.COUNTER_KEY;
import static com.linkedin.datastream.server.CoordinatorEventBlockingQueue.GAUGE_KEY;


/**
* Tests for {@link CoordinatorEventBlockingQueue}
*/
public class TestCoordinatorEventBlockingQueue {

private static final String SIMPLE_NAME = TestCoordinatorEventBlockingQueue.class.getSimpleName();
private static final String COUNTER_NAME =
MetricRegistry.name(CoordinatorEventBlockingQueue.class.getSimpleName(), SIMPLE_NAME, COUNTER_KEY);
private static final String GAUGE_NAME =
MetricRegistry.name(CoordinatorEventBlockingQueue.class.getSimpleName(), SIMPLE_NAME, GAUGE_KEY);

@BeforeMethod(alwaysRun = true)
public void resetMetrics() {
DynamicMetricsManager.createInstance(new MetricRegistry(), TestCoordinatorEventBlockingQueue.class.getName());
}

@Test
public void testHappyPath() throws Exception {
CoordinatorEventBlockingQueue eventBlockingQueue = new CoordinatorEventBlockingQueue();
CoordinatorEventBlockingQueue eventBlockingQueue = new CoordinatorEventBlockingQueue(SIMPLE_NAME);
eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true));
eventBlockingQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
Expand All @@ -33,4 +58,113 @@ public void testHappyPath() throws Exception {
Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.createLeaderPartitionAssignmentEvent("test2"));
Assert.assertEquals(eventBlockingQueue.take(), CoordinatorEvent.HANDLE_ASSIGNMENT_CHANGE_EVENT);
}

/**
* Verify metric registration.
*/
@Test
public void testRegistersMetricsCorrectly() {
CoordinatorEventBlockingQueue queue = new CoordinatorEventBlockingQueue(SIMPLE_NAME);
MetricsTestUtils.verifyMetrics(queue, DynamicMetricsManager.getInstance());
}

/**
* Verify metrics match operations: {@code put()}, {@code peek()}, {@code take()},
* and {@code clear()}. Counter should not be changed
*/
@Test(timeOut = 500)
public void testMetricOperations() throws InterruptedException {
CoordinatorEventBlockingQueue queue = new CoordinatorEventBlockingQueue(SIMPLE_NAME);
Counter counter = DynamicMetricsManager.getInstance().getMetric(COUNTER_NAME);
Gauge<Integer> gauge = DynamicMetricsManager.getInstance().getMetric(GAUGE_NAME);

Assert.assertNotNull(counter, "Counter was not found. Test setup failed.");
Assert.assertEquals(counter.getCount(), 0, "Initial value should be 0.");

// set counter to random negative value for verification
int random = -new Random().nextInt();
counter.inc(random);
Assert.assertEquals(counter.getCount(), random, "Override was not set");

Assert.assertNotNull(gauge, "Gauge was not found. Test setup failed.");
Assert.assertEquals((int) gauge.getValue(), 0, "Initial value should be 0.");

queue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
Assert.assertEquals(counter.getCount(), random, "Adding event to empty should not increment counter.");
Assert.assertEquals((int) gauge.getValue(), 1, "put() should increment gauge.");

CoordinatorEvent event0 = queue.peek();
Assert.assertNotNull(event0, "Event was not queued.");
Assert.assertEquals(counter.getCount(), random, "peek() should not alter counter.");
Assert.assertEquals((int) gauge.getValue(), 1, "peek() should affect gauge.");

CoordinatorEvent event1 = queue.take();
Assert.assertNotNull(event1, "Event was not queued.");
Assert.assertEquals(counter.getCount(), random, "remove() should not alter counter.");
Assert.assertEquals((int) gauge.getValue(), 0, "remove() should decrement gauge.");

queue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
queue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(true));
queue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1"));
Assert.assertEquals((int) gauge.getValue(), 3);
queue.clear();
Assert.assertEquals(counter.getCount(), random, "clear() should not alter counter.");
Assert.assertEquals((int) gauge.getValue(), 0, "clear() should reset gauge.");
}

/**
* Verify counter and gauge follow de-duplication. Adding duplicate events should not
* change gauge value, but should increment the counter.
*
* 2-step assertion for gauge:
* 1. queue.size() == value => verify test construction
* 2. gauge.getValue() == queue.size() => verify implementation
*/
@Test(timeOut = 500)
public void testGaugeDedupe() throws InterruptedException {
CoordinatorEventBlockingQueue queue = new CoordinatorEventBlockingQueue(SIMPLE_NAME);
Counter counter = DynamicMetricsManager.getInstance().getMetric(COUNTER_NAME);
Gauge<Integer> gauge = DynamicMetricsManager.getInstance().getMetric(GAUGE_NAME);

Assert.assertNotNull(counter, "Counter was not found. Test setup failed.");
Assert.assertEquals(counter.getCount(), 0, "Initial value should be 0.");

Assert.assertNotNull(gauge, "Gauge was not found. Test setup failed.");
Assert.assertEquals((int) gauge.getValue(), 0, "Initial value should be 0.");

queue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
Assert.assertEquals(queue.size(), 1);
Assert.assertEquals(counter.getCount(), 0, "Adding event to empty should not increment counter.");
Assert.assertEquals((int) gauge.getValue(), 1, "Add should increment gauge.");

queue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
Assert.assertEquals(queue.size(), 1);
Assert.assertEquals(counter.getCount(), 1, "Failed to count duplicate event.");
Assert.assertEquals((int) gauge.getValue(), queue.size(), "Duplicate event should not change gauge.");

queue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent("test1"));
Assert.assertEquals(queue.size(), 2);
Assert.assertEquals(counter.getCount(), 1, "Counter should not have been altered.");
Assert.assertEquals((int) gauge.getValue(), queue.size(), "Add should increment gauge.");

queue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
Assert.assertEquals(queue.size(), 2);
Assert.assertEquals(counter.getCount(), 2, "Failed to count second duplicate event.");
Assert.assertEquals((int) gauge.getValue(), queue.size(), "Duplicate event should not change gauge.");

CoordinatorEvent event0 = queue.take();
Assert.assertNotNull(event0, "Event was not queued.");
Assert.assertEquals(queue.size(), 1);
Assert.assertEquals((int) gauge.getValue(), queue.size(), "Remove should decrement gauge.");

CoordinatorEvent event1 = queue.take();
Assert.assertNotNull(event1, "Event was not queued.");
Assert.assertEquals(queue.size(), 0);
Assert.assertEquals((int) gauge.getValue(), queue.size(), "Remove should decrement gauge.");

CoordinatorEvent event2 = queue.peek();
Assert.assertNull(event2, "Event queue was expected to be empty.");
Assert.assertEquals(queue.size(), 0);
Assert.assertEquals((int) gauge.getValue(), queue.size(), "Value is never less than zero.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private MetricsTestUtils() {
* names start with the provided {@code metricsAware}'s simple class name.
*/
public static void verifyMetrics(MetricsAware metricsAware, DynamicMetricsManager metricsManager) {
verifyMetrics(metricsAware, metricsManager, s -> s.startsWith(metricsAware.getClass().getSimpleName()));
verifyMetrics(metricsAware, metricsManager, s -> s.matches(metricsAware.getClass().getSimpleName()));
}

/**
Expand Down

0 comments on commit 6f59d2a

Please sign in to comment.