Skip to content

Commit

Permalink
MINOR: Controller should process events without rate metrics (apache#…
Browse files Browse the repository at this point in the history
…7732)

Fixes apache#7717, which did not actually achieve its intended effect. The event manager failed to process the new event because we disabled the rate metric, which it expected to be present.

Reviewers: Ismael Juma <ismael@juma.me.uk
  • Loading branch information
hachikuji authored Nov 22, 2019
1 parent 1d8ebc7 commit f9fc53e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ class ControllerEventManager(controllerId: Int,
eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)

try {
rateAndTimeMetrics(state).time {
dequeued.process(processor)
def process(): Unit = dequeued.process(processor)

rateAndTimeMetrics.get(state) match {
case Some(timer) => timer.time { process() }
case None => process()
}
} catch {
case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ import java.util.concurrent.atomic.AtomicInteger

import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Histogram, MetricName, Timer}
import kafka.controller
import kafka.utils.TestUtils
import org.apache.kafka.common.message.UpdateMetadataResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.UpdateMetadataResponse
import org.apache.kafka.common.utils.MockTime
import org.junit.Assert.{assertEquals, assertTrue, fail}
import org.junit.{After, Test}

import scala.collection.JavaConverters._
import scala.collection.mutable

class ControllerEventManagerTest {

Expand Down Expand Up @@ -63,6 +68,31 @@ class ControllerEventManagerTest {
assertTrue(allEventManagerMetrics.isEmpty)
}

@Test
def testEventWithoutRateMetrics(): Unit = {
val time = new MockTime()
val controllerStats = new ControllerStats
val processedEvents = mutable.Set.empty[ControllerEvent]

val eventProcessor = new ControllerEventProcessor {
override def process(event: ControllerEvent): Unit = { processedEvents += event }
override def preempt(event: ControllerEvent): Unit = {}
}

controllerEventManager = new ControllerEventManager(0, eventProcessor,
time, controllerStats.rateAndTimeMetrics)
controllerEventManager.start()

val updateMetadataResponse = new UpdateMetadataResponse(
new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code)
)
val updateMetadataResponseEvent = controller.UpdateMetadataResponseReceived(updateMetadataResponse, brokerId = 1)
controllerEventManager.put(updateMetadataResponseEvent)
TestUtils.waitUntilTrue(() => processedEvents.size == 1,
"Failed to process expected event before timing out")
assertEquals(updateMetadataResponseEvent, processedEvents.head)
}

@Test
def testEventQueueTime(): Unit = {
val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
Expand Down

0 comments on commit f9fc53e

Please sign in to comment.