@@ -29,7 +29,6 @@ import kafka.network.Processor._
2929import kafka .network .RequestChannel .{CloseConnectionResponse , EndThrottlingResponse , NoOpResponse , SendResponse , StartThrottlingResponse }
3030import kafka .network .SocketServer ._
3131import kafka .server .{ApiVersionManager , BrokerReconfigurable , KafkaConfig }
32- import org .apache .kafka .network .EndPoint
3332import org .apache .kafka .common .message .ApiMessageType .ListenerType
3433import kafka .utils ._
3534import org .apache .kafka .common .config .ConfigException
@@ -96,7 +95,7 @@ class SocketServer(
9695 memoryPoolSensor.add(new Meter (TimeUnit .MILLISECONDS , memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
9796 private val memoryPool = if (config.queuedMaxBytes > 0 ) new SimpleMemoryPool (config.queuedMaxBytes, config.socketRequestMaxBytes, false , memoryPoolSensor) else MemoryPool .NONE
9897 // data-plane
99- private [network] val dataPlaneAcceptors = new ConcurrentHashMap [EndPoint , DataPlaneAcceptor ]()
98+ private [network] val dataPlaneAcceptors = new ConcurrentHashMap [Endpoint , DataPlaneAcceptor ]()
10099 val dataPlaneRequestChannel = new RequestChannel (maxQueuedRequests, DataPlaneAcceptor .MetricPrefix , time, apiVersionManager.newRequestMetrics)
101100
102101 private [this ] val nextProcessorId : AtomicInteger = new AtomicInteger (0 )
@@ -161,8 +160,8 @@ class SocketServer(
161160 * Therefore, we do not know that any particular request processor will be running by the end of
162161 * this function -- just that it might be running.
163162 *
164- * @param authorizerFutures Future per [[EndPoint ]] used to wait before starting the
165- * processor corresponding to the [[EndPoint ]]. Any endpoint
163+ * @param authorizerFutures Future per [[Endpoint ]] used to wait before starting the
164+ * processor corresponding to the [[Endpoint ]]. Any endpoint
166165 * that does not appear in this map will be started once all
167166 * authorizerFutures are complete.
168167 *
@@ -181,7 +180,7 @@ class SocketServer(
181180 // Because of ephemeral ports, we need to match acceptors to futures by looking at
182181 // the listener name, rather than the endpoint object.
183182 val authorizerFuture = authorizerFutures.find {
184- case (endpoint, _) => acceptor.endPoint.listenerName.value(). equals(endpoint.listenerName().get ())
183+ case (endpoint, _) => acceptor.endPoint.listenerName.equals(endpoint.listenerName())
185184 } match {
186185 case None => allAuthorizerFuturesComplete
187186 case Some ((_, future)) => future
@@ -210,23 +209,24 @@ class SocketServer(
210209 enableFuture
211210 }
212211
213- private def createDataPlaneAcceptorAndProcessors (endpoint : EndPoint ): Unit = synchronized {
212+ private def createDataPlaneAcceptorAndProcessors (endpoint : Endpoint ): Unit = synchronized {
214213 if (stopped) {
215214 throw new RuntimeException (" Can't create new data plane acceptor and processors: SocketServer is stopped." )
216215 }
217- val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
218- connectionQuotas.addListener(config, endpoint.listenerName)
219- val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
216+ val listenerName = ListenerName .normalised(endpoint.listenerName)
217+ val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
218+ connectionQuotas.addListener(config, listenerName)
219+ val isPrivilegedListener = config.interBrokerListenerName == listenerName
220220 val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
221221 config.addReconfigurable(dataPlaneAcceptor)
222222 dataPlaneAcceptor.configure(parsedConfigs)
223223 dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
224- info(s " Created data-plane acceptor and processors for endpoint : ${endpoint. listenerName}" )
224+ info(s " Created data-plane acceptor and processors for endpoint : ${listenerName}" )
225225 }
226226
227- private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
227+ private def endpoints = config.listeners.map(l => ListenerName .normalised( l.listenerName) -> l).toMap
228228
229- protected def createDataPlaneAcceptor (endPoint : EndPoint , isPrivilegedListener : Boolean , requestChannel : RequestChannel ): DataPlaneAcceptor = {
229+ protected def createDataPlaneAcceptor (endPoint : Endpoint , isPrivilegedListener : Boolean , requestChannel : RequestChannel ): DataPlaneAcceptor = {
230230 new DataPlaneAcceptor (this , endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
231231 }
232232
@@ -277,7 +277,7 @@ class SocketServer(
277277 /**
278278 * This method is called to dynamically add listeners.
279279 */
280- def addListeners (listenersAdded : Seq [EndPoint ]): Unit = synchronized {
280+ def addListeners (listenersAdded : Seq [Endpoint ]): Unit = synchronized {
281281 if (stopped) {
282282 throw new RuntimeException (" can't add new listeners: SocketServer is stopped." )
283283 }
@@ -297,10 +297,10 @@ class SocketServer(
297297 }
298298 }
299299
300- def removeListeners (listenersRemoved : Seq [EndPoint ]): Unit = synchronized {
300+ def removeListeners (listenersRemoved : Seq [Endpoint ]): Unit = synchronized {
301301 info(s " Removing data-plane listeners for endpoints $listenersRemoved" )
302302 listenersRemoved.foreach { endpoint =>
303- connectionQuotas.removeListener(config, endpoint.listenerName)
303+ connectionQuotas.removeListener(config, ListenerName .normalised( endpoint.listenerName) )
304304 dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
305305 acceptor.beginShutdown()
306306 acceptor.close()
@@ -345,7 +345,7 @@ class SocketServer(
345345 // For test usage
346346 def dataPlaneAcceptor (listenerName : String ): Option [DataPlaneAcceptor ] = {
347347 dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
348- if (endPoint.listenerName.value() == listenerName)
348+ if (endPoint.listenerName == listenerName)
349349 return Some (acceptor)
350350 }
351351 None
@@ -376,7 +376,7 @@ object DataPlaneAcceptor {
376376}
377377
378378class DataPlaneAcceptor (socketServer : SocketServer ,
379- endPoint : EndPoint ,
379+ endPoint : Endpoint ,
380380 config : KafkaConfig ,
381381 nodeId : Int ,
382382 connectionQuotas : ConnectionQuotas ,
@@ -409,7 +409,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
409409 * Returns the listener name associated with this reconfigurable. Listener-specific
410410 * configs corresponding to this listener name are provided for reconfiguration.
411411 */
412- override def listenerName (): ListenerName = endPoint.listenerName
412+ override def listenerName (): ListenerName = ListenerName .normalised( endPoint.listenerName)
413413
414414 /**
415415 * Returns the names of configs that may be reconfigured.
@@ -477,7 +477,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
477477 * Thread that accepts and configures new connections. There is one of these per endpoint.
478478 */
479479private [kafka] abstract class Acceptor (val socketServer : SocketServer ,
480- val endPoint : EndPoint ,
480+ val endPoint : Endpoint ,
481481 var config : KafkaConfig ,
482482 nodeId : Int ,
483483 val connectionQuotas : ConnectionQuotas ,
@@ -523,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
523523 private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup (" kafka.network" , " Acceptor" )
524524 private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
525525 s " ${metricPrefix()}AcceptorBlockedPercent " ,
526- Map (ListenerMetricTag -> endPoint.listenerName.value ).asJava)
526+ Map (ListenerMetricTag -> endPoint.listenerName).asJava)
527527 private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName," blocked time" , TimeUnit .NANOSECONDS )
528528 private var currentProcessorIndex = 0
529529 private [network] val throttledSockets = new mutable.PriorityQueue [DelayedCloseSocket ]()
@@ -636,7 +636,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
636636 new InetSocketAddress (host, port)
637637 }
638638 val serverChannel = socketServer.socketFactory.openServerSocket(
639- endPoint.listenerName.value() ,
639+ endPoint.listenerName,
640640 socketAddress,
641641 listenBacklogSize,
642642 recvBufferSize)
@@ -690,14 +690,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
690690 private def accept (key : SelectionKey ): Option [SocketChannel ] = {
691691 val serverSocketChannel = key.channel().asInstanceOf [ServerSocketChannel ]
692692 val socketChannel = serverSocketChannel.accept()
693+ val listenerName = ListenerName .normalised(endPoint.listenerName)
693694 try {
694- connectionQuotas.inc(endPoint. listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
695+ connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
695696 configureAcceptedSocketChannel(socketChannel)
696697 Some (socketChannel)
697698 } catch {
698699 case e : TooManyConnectionsException =>
699700 info(s " Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections. " )
700- connectionQuotas.closeChannel(this , endPoint. listenerName, socketChannel)
701+ connectionQuotas.closeChannel(this , listenerName, socketChannel)
701702 None
702703 case e : ConnectionThrottledException =>
703704 val ip = socketChannel.socket.getInetAddress
@@ -707,7 +708,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
707708 None
708709 case e : IOException =>
709710 error(s " Encountered an error while configuring the connection, closing it. " , e)
710- connectionQuotas.closeChannel(this , endPoint. listenerName, socketChannel)
711+ connectionQuotas.closeChannel(this , listenerName, socketChannel)
711712 None
712713 }
713714 }
@@ -749,7 +750,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
749750 def wakeup (): Unit = nioSelector.wakeup()
750751
751752 def addProcessors (toCreate : Int ): Unit = synchronized {
752- val listenerName = endPoint.listenerName
753+ val listenerName = ListenerName .normalised( endPoint.listenerName)
753754 val securityProtocol = endPoint.securityProtocol
754755 val listenerProcessors = new ArrayBuffer [Processor ]()
755756
0 commit comments