@@ -43,7 +43,7 @@ void TKafkaProduceActor::LogEvent(IEventHandle& ev) {
4343void TKafkaProduceActor::SendMetrics (const TString& topicName, size_t delta, const TString& name, const TActorContext& ctx) {
4444 auto topicWithoutDb = GetTopicNameWithoutDb (Context->DatabasePath , topicName);
4545 ctx.Send (MakeKafkaMetricsServiceID (), new TEvKafka::TEvUpdateCounter (delta, BuildLabels (Context, " " , topicWithoutDb, TStringBuilder () << " api.kafka.produce." << name, " " )));
46- ctx.Send (MakeKafkaMetricsServiceID (), new TEvKafka::TEvUpdateCounter (delta, BuildLabels (Context, " " , topicWithoutDb, " api.kafka.produce.total_messages" , " " )));
46+ ctx.Send (MakeKafkaMetricsServiceID (), new TEvKafka::TEvUpdateCounter (delta, BuildLabels (Context, " " , topicWithoutDb, " api.kafka.produce.total_messages" , " " )));
4747}
4848
4949void TKafkaProduceActor::Bootstrap (const NActors::TActorContext& /* ctx*/ ) {
@@ -82,7 +82,7 @@ void TKafkaProduceActor::PassAway() {
8282void TKafkaProduceActor::CleanTopics (const TActorContext& ctx) {
8383 const auto now = ctx.Now ();
8484
85- std::map<TString, TTopicInfo> newTopics;
85+ std::map<TString, TTopicInfo> newTopics;
8686 for (auto & [topicPath, topicInfo] : Topics) {
8787 if (topicInfo.ExpirationTime > now) {
8888 newTopics[topicPath] = std::move (topicInfo);
@@ -242,7 +242,8 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
242242THolder<TEvPartitionWriter::TEvWriteRequest> Convert (const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
243243 const TString& topicName,
244244 ui64 cookie,
245- const TString& clientDC) {
245+ const TString& clientDC,
246+ bool ruPerRequest) {
246247 auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
247248 auto & request = ev->Record ;
248249
@@ -254,6 +255,9 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
254255 partitionRequest->SetPartition (data.Index );
255256 // partitionRequest->SetCmdWriteOffset();
256257 partitionRequest->SetCookie (cookie);
258+ if (ruPerRequest) {
259+ partitionRequest->SetMeteringV2Enabled (true );
260+ }
257261
258262 ui64 totalSize = 0 ;
259263
@@ -317,11 +321,11 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
317321 pendingRequest->StartTime = ctx.Now ();
318322
319323 size_t position = 0 ;
324+ bool ruPerRequest = Context->Config .GetMeteringV2Enabled ();
320325 for (const auto & topicData : r->TopicData ) {
321326 const TString& topicPath = NormalizePath (Context->DatabasePath , *topicData.Name );
322327 for (const auto & partitionData : topicData.PartitionData ) {
323328 const auto partitionId = partitionData.Index ;
324-
325329 auto writer = PartitionWriter (topicPath, partitionId, ctx);
326330 if (OK == writer.first ) {
327331 auto ownCookie = ++Cookie;
@@ -334,7 +338,8 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
334338 pendingRequest->WaitAcceptingCookies .insert (ownCookie);
335339 pendingRequest->WaitResultCookies .insert (ownCookie);
336340
337- auto ev = Convert (partitionData, *topicData.Name , ownCookie, ClientDC);
341+ auto ev = Convert (partitionData, *topicData.Name , ownCookie, ClientDC, ruPerRequest);
342+ ruPerRequest = false ;
338343
339344 Send (writer.second , std::move (ev));
340345 } else {
@@ -441,7 +446,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
441446 // We send the results in the order of receipt of the request
442447 while (!PendingRequests.empty ()) {
443448 auto pendingRequest = PendingRequests.front ();
444-
449+
445450 // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
446451 bool expired = expireTime > pendingRequest->StartTime ;
447452
0 commit comments