@@ -239,7 +239,7 @@ void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBu
239239}
240240
241241void TFederatedWriteSession::Write (NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
242- return WriteInternal (std::move (token), std::move (message));
242+ return WriteInternal (std::move (token), TWrappedWriteMessage ( std::move (message) ));
243243}
244244
245245void TFederatedWriteSession::WriteEncoded (NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec,
@@ -249,24 +249,24 @@ void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TS
249249 message.SeqNo (*seqNo);
250250 if (createTimestamp.Defined ())
251251 message.CreateTimestamp (*createTimestamp);
252- return WriteInternal (std::move (token), std::move (message));
252+ return WriteInternal (std::move (token), TWrappedWriteMessage ( std::move (message) ));
253253}
254254
255255void TFederatedWriteSession::WriteEncoded (NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
256- return WriteInternal (std::move (token), std::move (message));
256+ return WriteInternal (std::move (token), TWrappedWriteMessage ( std::move (message) ));
257257}
258258
259- void TFederatedWriteSession::WriteInternal (NTopic::TContinuationToken&&, NTopic::TWriteMessage && message ) {
259+ void TFederatedWriteSession::WriteInternal (NTopic::TContinuationToken&&, TWrappedWriteMessage && wrapped ) {
260260 ClientHasToken = false ;
261- if (!message .CreateTimestamp_ .Defined ()) {
262- message .CreateTimestamp_ = TInstant::Now ();
261+ if (!wrapped. Message .CreateTimestamp_ .Defined ()) {
262+ wrapped. Message .CreateTimestamp_ = TInstant::Now ();
263263 }
264264
265265 {
266266 TDeferredWrite deferred (Subsession);
267267 with_lock (Lock) {
268- BufferFreeSpace -= message .Data .size ();
269- OriginalMessagesToPassDown.emplace_back (std::move (message ));
268+ BufferFreeSpace -= wrapped. Message .Data .size ();
269+ OriginalMessagesToPassDown.emplace_back (std::move (wrapped ));
270270
271271 PrepareDeferredWrite (deferred);
272272 }
@@ -285,10 +285,10 @@ bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) {
285285 if (OriginalMessagesToPassDown.empty ()) {
286286 return false ;
287287 }
288- OriginalMessagesToGetAck.push_back (OriginalMessagesToPassDown.front ());
289- deferred.Token .ConstructInPlace (std::move (*PendingToken));
290- deferred.Message .ConstructInPlace (std::move (OriginalMessagesToPassDown.front ()));
288+ OriginalMessagesToGetAck.push_back (std::move (OriginalMessagesToPassDown.front ()));
291289 OriginalMessagesToPassDown.pop_front ();
290+ deferred.Token .ConstructInPlace (std::move (*PendingToken));
291+ deferred.Message .ConstructInPlace (std::move (OriginalMessagesToGetAck.back ().Message ));
292292 PendingToken.Clear ();
293293 return true ;
294294}
0 commit comments