Skip to content

Commit de6d186

Browse files
authored
ydb_topic: split write messages on different codecs (#3428)
1 parent b04058a commit de6d186

File tree

2 files changed

+128
-14
lines changed

2 files changed

+128
-14
lines changed

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
11921192
TBlock block{};
11931193
for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) {
11941194
auto& currMessage = CurrentBatch.Messages[i];
1195+
1196+
// If MaxBlockSize or MaxBlockMessageCount values are ever changed from infinity and 1 correspondingly,
1197+
// create a new block, if the existing one is non-empty AND (adding another message will overflow it OR
1198+
// its codec is different from the codec of the next message).
1199+
11951200
auto id = currMessage.Id;
11961201
auto createTs = currMessage.CreatedAt;
11971202

@@ -1283,22 +1288,25 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
12831288
void TWriteSessionImpl::SendImpl() {
12841289
Y_ABORT_UNLESS(Lock.IsLocked());
12851290

1286-
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB
1287-
while(IsReadyToSendNextImpl()) {
1291+
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
1292+
while (IsReadyToSendNextImpl()) {
12881293
TClientMessage clientMessage;
12891294
auto* writeRequest = clientMessage.mutable_write_request();
1290-
1291-
// Sent blocks while we can without messages reordering
1295+
ui32 prevCodec = 0;
1296+
// Send blocks while we can without messages reordering.
12921297
while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) {
12931298
const auto& block = PackedMessagesToSend.top();
12941299
Y_ABORT_UNLESS(block.Valid);
1300+
if (writeRequest->messages_size() > 0 && prevCodec != block.CodecID) {
1301+
break;
1302+
}
1303+
prevCodec = block.CodecID;
12951304
writeRequest->set_codec(static_cast<i32>(block.CodecID));
12961305
Y_ABORT_UNLESS(block.MessageCount == 1);
12971306
for (size_t i = 0; i != block.MessageCount; ++i) {
12981307
Y_ABORT_UNLESS(!OriginalMessagesToSend.empty());
12991308

13001309
auto& message = OriginalMessagesToSend.front();
1301-
13021310
auto* msgData = writeRequest->add_messages();
13031311

13041312
if (message.Tx) {
@@ -1309,27 +1317,24 @@ void TWriteSessionImpl::SendImpl() {
13091317
msgData->set_seq_no(GetSeqNoImpl(message.Id));
13101318
*msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
13111319

1312-
if (!message.MessageMeta.empty()) {
1313-
for (auto& [k, v] : message.MessageMeta) {
1314-
auto* pair = msgData->add_metadata_items();
1315-
pair->set_key(k);
1316-
pair->set_value(v);
1317-
}
1320+
for (auto& [k, v] : message.MessageMeta) {
1321+
auto* pair = msgData->add_metadata_items();
1322+
pair->set_key(k);
1323+
pair->set_value(v);
13181324
}
13191325
SentOriginalMessages.emplace(std::move(message));
13201326
OriginalMessagesToSend.pop();
13211327

13221328
msgData->set_uncompressed_size(block.OriginalSize);
1323-
if (block.Compressed)
1329+
if (block.Compressed) {
13241330
msgData->set_data(block.Data.data(), block.Data.size());
1325-
else {
1331+
} else {
13261332
for (auto& buffer: block.OriginalDataRefs) {
13271333
msgData->set_data(buffer.data(), buffer.size());
13281334
}
13291335
}
13301336
}
13311337

1332-
13331338
TBlock moveBlock;
13341339
moveBlock.Move(block);
13351340
SentPackedMessage.emplace(std::move(moveBlock));

ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,115 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
698698
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);
699699

700700
}
701+
702+
Y_UNIT_TEST(TWriteSession_WriteEncoded) {
703+
// This test was adapted from ydb_persqueue tests.
704+
// It writes 4 messages: 2 with default codec, 1 with explicitly set GZIP codec, 1 with RAW codec.
705+
// The last message MUST be sent in a separate WriteRequest, as it has a codec field applied for all messages in the request.
706+
// This separation currently happens in TWriteSessionImpl::SendImpl method.
707+
708+
auto setup = std::make_shared<TTopicSdkTestSetup>(TEST_CASE_NAME);
709+
auto client = setup->MakeClient();
710+
auto settings = TWriteSessionSettings()
711+
.Path(TEST_TOPIC)
712+
.MessageGroupId(TEST_MESSAGE_GROUP_ID);
713+
714+
size_t batchSize = 100000000;
715+
settings.BatchFlushInterval(TDuration::Seconds(1000)); // Batch on size, not on time.
716+
settings.BatchFlushSizeBytes(batchSize);
717+
auto writer = client.CreateWriteSession(settings);
718+
TString message = "message";
719+
TString packed;
720+
{
721+
TStringOutput so(packed);
722+
TZLibCompress oss(&so, ZLib::GZip, 6);
723+
oss << message;
724+
}
725+
726+
Cerr << message << " " << packed << "\n";
727+
728+
{
729+
auto event = *writer->GetEvent(true);
730+
UNIT_ASSERT(!writer->WaitEvent().Wait(TDuration::Seconds(1)));
731+
auto ev = writer->WaitEvent();
732+
UNIT_ASSERT(std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event));
733+
auto continueToken = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken);
734+
writer->Write(std::move(continueToken), message);
735+
UNIT_ASSERT(ev.Wait(TDuration::Seconds(1)));
736+
}
737+
{
738+
auto event = *writer->GetEvent(true);
739+
UNIT_ASSERT(std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event));
740+
auto continueToken = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken);
741+
writer->Write(std::move(continueToken), "");
742+
}
743+
{
744+
auto event = *writer->GetEvent(true);
745+
UNIT_ASSERT(std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event));
746+
auto continueToken = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken);
747+
writer->WriteEncoded(std::move(continueToken), packed, ECodec::GZIP, message.size());
748+
}
749+
750+
ui32 acks = 0, tokens = 0;
751+
while(acks < 4 || tokens < 2) {
752+
auto event = *writer->GetEvent(true);
753+
if (std::holds_alternative<TWriteSessionEvent::TAcksEvent>(event)) {
754+
acks += std::get<TWriteSessionEvent::TAcksEvent>(event).Acks.size();
755+
}
756+
if (std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event)) {
757+
if (tokens == 0) {
758+
auto continueToken = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken);
759+
writer->WriteEncoded(std::move(continueToken), "", ECodec::RAW, 0);
760+
}
761+
++tokens;
762+
}
763+
Cerr << "GOT EVENT " << acks << " " << tokens << "\n";
764+
}
765+
UNIT_ASSERT(!writer->WaitEvent().Wait(TDuration::Seconds(5)));
766+
767+
UNIT_ASSERT_VALUES_EQUAL(acks, 4);
768+
UNIT_ASSERT_VALUES_EQUAL(tokens, 2);
769+
770+
auto readSettings = TReadSessionSettings()
771+
.ConsumerName(TEST_CONSUMER)
772+
.AppendTopics(TEST_TOPIC);
773+
std::shared_ptr<IReadSession> readSession = client.CreateReadSession(readSettings);
774+
ui32 readMessageCount = 0;
775+
while (readMessageCount < 4) {
776+
Cerr << "Get event on client\n";
777+
auto event = *readSession->GetEvent(true);
778+
std::visit(TOverloaded {
779+
[&](TReadSessionEvent::TDataReceivedEvent& event) {
780+
for (auto& message: event.GetMessages()) {
781+
TString sourceId = message.GetMessageGroupId();
782+
ui32 seqNo = message.GetSeqNo();
783+
UNIT_ASSERT_VALUES_EQUAL(readMessageCount + 1, seqNo);
784+
++readMessageCount;
785+
UNIT_ASSERT_VALUES_EQUAL(message.GetData(), (seqNo % 2) == 1 ? "message" : "");
786+
}
787+
},
788+
[&](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&) {
789+
UNIT_FAIL("no commits in test");
790+
},
791+
[&](TReadSessionEvent::TStartPartitionSessionEvent& event) {
792+
event.Confirm();
793+
},
794+
[&](TReadSessionEvent::TStopPartitionSessionEvent& event) {
795+
event.Confirm();
796+
},
797+
[&](TReadSessionEvent::TPartitionSessionStatusEvent&) {
798+
UNIT_FAIL("Test does not support lock sessions yet");
799+
},
800+
[&](TReadSessionEvent::TPartitionSessionClosedEvent&) {
801+
UNIT_FAIL("Test does not support lock sessions yet");
802+
},
803+
[&](TSessionClosedEvent&) {
804+
UNIT_FAIL("Session closed");
805+
}
806+
807+
}, event);
808+
}
809+
}
701810
} // Y_UNIT_TEST_SUITE(BasicUsage)
702811

703812
Y_UNIT_TEST_SUITE(TSettingsValidation) {

0 commit comments

Comments
 (0)