Skip to content

MINOR; Use underscore for variable initialization in Scala sources #12534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
with KafkaCSVMetricsReporterMBean
with Logging {

private var csvDir: File = null
private var underlying: CsvReporter = null
private var csvDir: File = _
private var underlying: CsvReporter = _
private var running = false
private var initialized = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait KafkaMetricsReporter {

object KafkaMetricsReporter {
val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
private var reporters: ArrayBuffer[KafkaMetricsReporter] = null
private var reporters: ArrayBuffer[KafkaMetricsReporter] = _

def startReporters(verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = {
ReporterStarted synchronized {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
class ErrorMeter(name: String, error: Errors) {
private val tags = Map("request" -> name, "error" -> error.name)

@volatile private var meter: Meter = null
@volatile private var meter: Meter = _

def getOrCreateMeter(): Meter = {
if (meter != null)
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ class ControllerServer(
val awaitShutdownCond = lock.newCondition()
var status: ProcessStatus = SHUTDOWN

var linuxIoMetricsCollector: LinuxIoMetricsCollector = null
@volatile var authorizer: Option[Authorizer] = null
var tokenCache: DelegationTokenCache = null
var credentialProvider: CredentialProvider = null
var socketServer: SocketServer = null
var linuxIoMetricsCollector: LinuxIoMetricsCollector = _
@volatile var authorizer: Option[Authorizer] = None
var tokenCache: DelegationTokenCache = _
var credentialProvider: CredentialProvider = _
var socketServer: SocketServer = _
val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
var createTopicPolicy: Option[CreateTopicPolicy] = None
var alterConfigPolicy: Option[AlterConfigPolicy] = None
var controller: Controller = null
var quotaManagers: QuotaManagers = null
var controllerApis: ControllerApis = null
var controllerApisHandlerPool: KafkaRequestHandlerPool = null
var controller: Controller = _
var quotaManagers: QuotaManagers = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _

private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
lock.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class DelegationTokenManager(val config: KafkaConfig,
val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
val tokenRemoverScanInterval: Long = config.delegationTokenExpiryCheckIntervalMs
private val lock = new Object()
private var tokenChangeListener: ZkNodeChangeNotificationListener = null
private var tokenChangeListener: ZkNodeChangeNotificationListener = _

def startup(): Unit = {
if (config.tokenAuthEnabled) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/FetchSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ class IncrementalFetchContext(private val time: Time,
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: Boolean)
extends FetchSession.RESP_MAP_ITER {
var nextElement: util.Map.Entry[TopicIdPartition, FetchResponseData.PartitionData] = null
var nextElement: util.Map.Entry[TopicIdPartition, FetchResponseData.PartitionData] = _

override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
Expand Down
62 changes: 31 additions & 31 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,66 +101,66 @@ class KafkaServer(

@volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
private var shutdownLatch = new CountDownLatch(1)
private var logContext: LogContext = null
private var logContext: LogContext = _

private val kafkaMetricsReporters: Seq[KafkaMetricsReporter] =
KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
var kafkaYammerMetrics: KafkaYammerMetrics = null
var metrics: Metrics = null
var kafkaYammerMetrics: KafkaYammerMetrics = _
var metrics: Metrics = _

@volatile var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null
@volatile var dataPlaneRequestProcessor: KafkaApis = _
var controlPlaneRequestProcessor: KafkaApis = _

var authorizer: Option[Authorizer] = None
@volatile var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
@volatile var socketServer: SocketServer = _
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = _

var logDirFailureChannel: LogDirFailureChannel = null
@volatile private var _logManager: LogManager = null
var logDirFailureChannel: LogDirFailureChannel = _
@volatile private var _logManager: LogManager = _

@volatile private var _replicaManager: ReplicaManager = null
var adminManager: ZkAdminManager = null
var tokenManager: DelegationTokenManager = null
@volatile private var _replicaManager: ReplicaManager = _
var adminManager: ZkAdminManager = _
var tokenManager: DelegationTokenManager = _

var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: ZkConfigManager = null
var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null
var dynamicConfigHandlers: Map[String, ConfigHandler] = _
var dynamicConfigManager: ZkConfigManager = _
var credentialProvider: CredentialProvider = _
var tokenCache: DelegationTokenCache = _

@volatile var groupCoordinator: GroupCoordinator = null
@volatile var groupCoordinator: GroupCoordinator = _

var transactionCoordinator: TransactionCoordinator = null
var transactionCoordinator: TransactionCoordinator = _

@volatile private var _kafkaController: KafkaController = null
@volatile private var _kafkaController: KafkaController = _

var forwardingManager: Option[ForwardingManager] = None

var autoTopicCreationManager: AutoTopicCreationManager = null
var autoTopicCreationManager: AutoTopicCreationManager = _

var clientToControllerChannelManager: BrokerToControllerChannelManager = null
var clientToControllerChannelManager: BrokerToControllerChannelManager = _

var alterPartitionManager: AlterPartitionManager = null
var alterPartitionManager: AlterPartitionManager = _

var kafkaScheduler: KafkaScheduler = null
var kafkaScheduler: KafkaScheduler = _

@volatile var metadataCache: ZkMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
@volatile var metadataCache: ZkMetadataCache = _
var quotaManagers: QuotaFactory.QuotaManagers = _

val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
private var _zkClient: KafkaZkClient = null
private var configRepository: ZkConfigRepository = null
private var _zkClient: KafkaZkClient = _
private var configRepository: ZkConfigRepository = _

val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map { logDir =>
(logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))
}.toMap

private var _clusterId: String = null
@volatile var _brokerTopicStats: BrokerTopicStats = null
private var _clusterId: String = _
@volatile var _brokerTopicStats: BrokerTopicStats = _

private var _featureChangeListener: FinalizedFeatureChangeListener = null
private var _featureChangeListener: FinalizedFeatureChangeListener = _

val brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)

private var logDirFailureHandler: LogDirFailureHandler = null
private var logDirFailureHandler: LogDirFailureHandler = _

private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig,
private val time: Time) extends Logging with ReplicaQuota {
private val lock = new ReentrantReadWriteLock()
private val throttledPartitions = new ConcurrentHashMap[String, Seq[Int]]()
private var quota: Quota = null
private var quota: Quota = _
private val sensorAccess = new SensorAccess(lock, metrics)
private val rateMetricName = metrics.metricName("byte-rate", replicationType.toString,
s"Tracking byte-rate for $replicationType")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ object ConsoleConsumer extends Logging {
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)

// topic must be specified.
var topicArg: String = null
var includedTopicsArg: String = null
var filterSpec: TopicFilter = null
var topicArg: String = _
var includedTopicsArg: String = _
var filterSpec: TopicFilter = _
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ object ConsoleProducer {
}

class LineMessageReader extends MessageReader {
var topic: String = null
var reader: BufferedReader = null
var topic: String = _
var reader: BufferedReader = _
var parseKey = false
var keySeparator = "\t"
var parseHeaders = false
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ import scala.util.{Failure, Success, Try}
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0")
object MirrorMaker extends Logging with KafkaMetricsGroup {

private[tools] var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private[tools] var producer: MirrorMakerProducer = _
private var mirrorMakerThreads: Seq[MirrorMakerThread] = _
private val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker.
private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
private var messageHandler: MirrorMakerMessageHandler = null
private var messageHandler: MirrorMakerMessageHandler = _
private var offsetCommitIntervalMs = 0
private var abortOnSendFailure: Boolean = true
@volatile private var exitingOnSendFailure: Boolean = false
Expand Down Expand Up @@ -297,7 +297,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
customRebalanceListener: Option[ConsumerRebalanceListener],
includeOpt: Option[String]) {
val regex = includeOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports include."))
var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = _

// We manually maintain the consumed offsets for historical reasons and it could be simplified
// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicPartition: collection.Map[To
@volatile private var lastReportTime = Time.SYSTEM.milliseconds
private var maxLag: Long = -1L
private var offsetWithMaxLag: Long = -1L
private var maxLagTopicAndPartition: TopicPartition = null
private var maxLagTopicAndPartition: TopicPartition = _
initialize()

def createNewFetcherBarrier(): Unit = {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ object StateChangeLogMerger extends Logging {
val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
val dateFormat = new SimpleDateFormat(dateFormatString)
var files: List[String] = List()
var topic: String = null
var topic: String = _
var partitions: List[Int] = List()
var startDate: Date = null
var endDate: Date = null
var startDate: Date = _
var endDate: Date = _

def main(args: Array[String]): Unit = {

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/FileLock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FileLock(val file: File) extends Logging {

private val channel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)
private var flock: java.nio.channels.FileLock = null
private var flock: java.nio.channels.FileLock = _

/**
* Lock the file or throw an exception if the lock is already held
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/KafkaScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ trait Scheduler {
class KafkaScheduler(val threads: Int,
val threadNamePrefix: String = "kafka-scheduler-",
daemon: Boolean = true) extends Scheduler with Logging {
private var executor: ScheduledThreadPoolExecutor = null
private var executor: ScheduledThreadPoolExecutor = _
private val schedulerThreadId = new AtomicInteger(0)

override def startup(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/timer/TimerTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ trait TimerTask extends Runnable {

val delayMs: Long // timestamp in millisecond

private[this] var timerTaskEntry: TimerTaskEntry = null
private[this] var timerTaskEntry: TimerTaskEntry = _

def cancel(): Unit = {
synchronized {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {

@volatile
var list: TimerTaskList = null
var next: TimerTaskEntry = null
var prev: TimerTaskEntry = null
var list: TimerTaskList = _
var next: TimerTaskEntry = _
var prev: TimerTaskEntry = _

// if this timerTask is already held by an existing timer task entry,
// setTimerTaskEntry will remove it.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/utils/timer/TimingWheel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta

// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
@volatile private[this] var overflowWheel: TimingWheel = null
@volatile private[this] var overflowWheel: TimingWheel = _

private[this] def addOverflowWheel(): Unit = {
synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
class ListOffsetsIntegrationTest extends KafkaServerTestHarness {

val topicName = "foo"
var adminClient: Admin = null
var adminClient: Admin = _

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import scala.jdk.CollectionConverters._
@Timeout(300)
class ReassignPartitionsIntegrationTest extends QuorumTestHarness {

var cluster: ReassignPartitionsTestCluster = null
var cluster: ReassignPartitionsTestCluster = _

@AfterEach
override def tearDown(): Unit = {
Expand Down Expand Up @@ -618,9 +618,9 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness {

var servers = new mutable.ArrayBuffer[KafkaBroker]

var brokerList: String = null
var brokerList: String = _

var adminClient: Admin = null
var adminClient: Admin = _

def setup(): Unit = {
createServers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with

import AdminClientWithPoliciesIntegrationTest._

var client: Admin = null
var client: Admin = _
val brokerCount = 3

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
def brokerCount = 3
override def logDirCount = 2

var testInfo: TestInfo = null
var testInfo: TestInfo = _

var client: Admin = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {

private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
protected var admin: Admin = null
protected var admin: Admin = _

protected val topic = "topic"
private val numRecords = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
private val topic = "topic"
private val numRecords = 2000

private var broker: KafkaBroker = null
private var broker: KafkaBroker = _

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
Expand Down
Loading