@@ -855,44 +855,40 @@ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
855855}
856856
857857ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo () {
858- ConsumerRunningInfo* info = new ConsumerRunningInfo ();
859- if (info) {
860- if (m_consumerService->getConsumeMsgSerivceListenerType () == messageListenerOrderly)
861- info->setProperty (ConsumerRunningInfo::PROP_CONSUME_ORDERLY, " true" );
862- else
863- info->setProperty (ConsumerRunningInfo::PROP_CONSUME_ORDERLY, " flase" );
864- info->setProperty (ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string (m_consumeThreadCount));
865- info->setProperty (ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string (m_startTime));
866-
867- vector<SubscriptionData> result;
868- getSubscriptions (result);
869- info->setSubscriptionSet (result);
870-
871- map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable ();
872- map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin ();
873-
874- for (; it != requestTable.end (); ++it) {
875- if (!it->second ->isDroped ()) {
876- map<MessageQueue, ProcessQueueInfo> queueTable;
877- MessageQueue queue ((it->first ).getTopic (), (it->first ).getBrokerName (), (it->first ).getQueueId ());
878- ProcessQueueInfo processQueue;
879- processQueue.cachedMsgMinOffset = it->second ->getCacheMinOffset ();
880- processQueue.cachedMsgMaxOffset = it->second ->getCacheMaxOffset ();
881- processQueue.cachedMsgCount = it->second ->getCacheMsgCount ();
882- processQueue.setCommitOffset (
883- m_pOffsetStore->readOffset (it->first , MEMORY_FIRST_THEN_STORE, getSessionCredentials ()));
884- processQueue.setDroped (it->second ->isDroped ());
885- processQueue.setLocked (it->second ->isLocked ());
886- processQueue.lastLockTimestamp = it->second ->getLastLockTimestamp ();
887- processQueue.lastPullTimestamp = it->second ->getLastPullTimestamp ();
888- processQueue.lastConsumeTimestamp = it->second ->getLastConsumeTimestamp ();
889- info->setMqTable (queue, processQueue);
890- }
858+ auto * info = new ConsumerRunningInfo ();
859+ if (m_consumerService->getConsumeMsgSerivceListenerType () == messageListenerOrderly) {
860+ info->setProperty (ConsumerRunningInfo::PROP_CONSUME_ORDERLY, " true" );
861+ } else {
862+ info->setProperty (ConsumerRunningInfo::PROP_CONSUME_ORDERLY, " false" );
863+ }
864+ info->setProperty (ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string (m_consumeThreadCount));
865+ info->setProperty (ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string (m_startTime));
866+
867+ std::vector<SubscriptionData> result;
868+ getSubscriptions (result);
869+ info->setSubscriptionSet (result);
870+
871+ std::map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable ();
872+
873+ for (const auto & it : requestTable) {
874+ if (!it.second ->isDroped ()) {
875+ MessageQueue queue ((it.first ).getTopic (), (it.first ).getBrokerName (), (it.first ).getQueueId ());
876+ ProcessQueueInfo processQueue;
877+ processQueue.cachedMsgMinOffset = it.second ->getCacheMinOffset ();
878+ processQueue.cachedMsgMaxOffset = it.second ->getCacheMaxOffset ();
879+ processQueue.cachedMsgCount = it.second ->getCacheMsgCount ();
880+ processQueue.setCommitOffset (
881+ m_pOffsetStore->readOffset (it.first , MEMORY_FIRST_THEN_STORE, getSessionCredentials ()));
882+ processQueue.setDroped (it.second ->isDroped ());
883+ processQueue.setLocked (it.second ->isLocked ());
884+ processQueue.lastLockTimestamp = it.second ->getLastLockTimestamp ();
885+ processQueue.lastPullTimestamp = it.second ->getLastPullTimestamp ();
886+ processQueue.lastConsumeTimestamp = it.second ->getLastConsumeTimestamp ();
887+ info->setMqTable (queue, processQueue);
891888 }
892-
893- return info;
894889 }
895- return NULL ;
890+
891+ return info;
896892}
897893
898894// <!************************************************************************
0 commit comments