Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void TPartition::ReplyWrite(
write->SetTotalTimeInPartitionQueueMs(queueTime.MilliSeconds());
write->SetWriteTimeMs(writeTime.MilliSeconds());

write->SetWrittenInTx(IsSupportive());

ctx.Send(Tablet, response.Release());
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/msgbus_pq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ message TPersQueuePartitionResponse {
optional uint32 TopicQuotedTimeMs = 11;
optional uint32 TotalTimeInPartitionQueueMs = 9;
optional uint32 WriteTimeMs = 10;

optional bool WrittenInTx = 12;
}

message TCmdGetMaxSeqNoResult {
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ message StreamWriteMessage {
oneof message_write_status {
Written written = 2;
Skipped skipped = 3;
WrittenInTx written_in_tx = 4;
}

message Written {
Expand All @@ -213,6 +214,9 @@ message StreamWriteMessage {
REASON_ALREADY_WRITTEN = 1;
}
}

message WrittenInTx {
}
}

// Message with write statistics.
Expand Down
22 changes: 14 additions & 8 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,17 +981,23 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time());
writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time());

for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) {
for (const auto& ack : batchWriteResponse.acks()) {
// TODO: Fill writer statistics
auto ack = batchWriteResponse.acks(messageIndex);
ui64 sequenceNumber = ack.seq_no();

Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped());
auto msgWriteStatus = ack.has_written()
? TWriteSessionEvent::TWriteAck::EES_WRITTEN
: (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN
? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
: TWriteSessionEvent::TWriteAck::EES_DISCARDED);
Y_ABORT_UNLESS(ack.has_written() || ack.has_skipped() || ack.has_written_in_tx());

TWriteSessionEvent::TWriteAck::EEventState msgWriteStatus;
if (ack.has_written_in_tx()) {
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX;
} else if (ack.has_written()) {
msgWriteStatus = TWriteSessionEvent::TWriteAck::EES_WRITTEN;
} else {
msgWriteStatus =
(ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN)
? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN
: TWriteSessionEvent::TWriteAck::EES_DISCARDED;
}

ui64 offset = ack.has_written() ? ack.written().offset() : 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ struct TWriteSessionEvent {
enum EEventState {
EES_WRITTEN, //! Successfully written.
EES_ALREADY_WRITTEN, //! Skipped on SeqNo deduplication.
EES_DISCARDED //! In case of destruction of writer or retry policy discarded future retries in this writer.
EES_DISCARDED, //! In case of destruction of writer or retry policy discarded future retries in this writer.
EES_WRITTEN_IN_TX, //! Successfully written in tx.
};
//! Details of successfully written message.
struct TWrittenMessageDetails {
Expand Down
94 changes: 56 additions & 38 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ class TFixture : public NUnitTest::TBaseFixture {
TTopicWriteSessionPtr Session;
TMaybe<NTopic::TContinuationToken> ContinuationToken;
size_t WriteCount = 0;
size_t AckCount = 0;
size_t WrittenAckCount = 0;
size_t WrittenInTxAckCount = 0;

void WaitForContinuationToken();
void Write(const TString& message, NTable::TTransaction* tx = nullptr);

size_t AckCount() const { return WrittenAckCount + WrittenInTxAckCount; }

void WaitForEvent();
};

void SetUp(NUnitTest::TTestContext&) override;
Expand Down Expand Up @@ -88,7 +93,8 @@ class TFixture : public NUnitTest::TBaseFixture {
NTable::TTransaction* tx = nullptr,
TMaybe<ui32> partitionId = Nothing());
void WaitForAcks(const TString& topicPath,
const TString& messageGroupId);
const TString& messageGroupId,
size_t writtenInTxCount = Max<size_t>());
void WaitForSessionClose(const TString& topicPath,
const TString& messageGroupId,
NYdb::EStatus status);
Expand Down Expand Up @@ -591,19 +597,31 @@ auto TFixture::GetTopicReadSession(const TString& topicPath,
void TFixture::TTopicWriteSessionContext::WaitForContinuationToken()
{
while (!ContinuationToken.Defined()) {
Session->WaitEvent().Wait();
for (auto& event : Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) {
++AckCount;
}
WaitForEvent();
}
}

void TFixture::TTopicWriteSessionContext::WaitForEvent()
{
Session->WaitEvent().Wait();
for (auto& event : Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++WrittenInTxAckCount;
break;
default:
break;
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
}
}
Expand Down Expand Up @@ -691,34 +709,25 @@ TVector<TString> TFixture::ReadFromTopic(const TString& topicPath,
return messages;
}

void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroupId)
void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroupId, size_t writtenInTxCount)
{
std::pair<TString, TString> key(topicPath, messageGroupId);
auto i = TopicWriteSessions.find(key);
UNIT_ASSERT(i != TopicWriteSessions.end());

auto& context = i->second;

UNIT_ASSERT(context.AckCount <= context.WriteCount);
UNIT_ASSERT(context.AckCount() <= context.WriteCount);

while (context.AckCount < context.WriteCount) {
context.Session->WaitEvent().Wait();
for (auto& event : context.Session->GetEvents()) {
if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
context.ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) {
++context.AckCount;
}
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
UNIT_FAIL("");
}
}
while (context.AckCount() < context.WriteCount) {
context.WaitForEvent();
}

UNIT_ASSERT(context.AckCount == context.WriteCount);
UNIT_ASSERT((context.WrittenAckCount + context.WrittenInTxAckCount) == context.WriteCount);

if (writtenInTxCount != Max<size_t>()) {
UNIT_ASSERT_VALUES_EQUAL(context.WrittenInTxAckCount, writtenInTxCount);
}
}

void TFixture::WaitForSessionClose(const TString& topicPath,
Expand All @@ -731,7 +740,7 @@ void TFixture::WaitForSessionClose(const TString& topicPath,

auto& context = i->second;

UNIT_ASSERT(context.AckCount <= context.WriteCount);
UNIT_ASSERT(context.AckCount() <= context.WriteCount);

for(bool stop = false; !stop; ) {
context.Session->WaitEvent().Wait();
Expand All @@ -740,8 +749,15 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
context.ContinuationToken = std::move(e->ContinuationToken);
} else if (auto* e = std::get_if<NTopic::TWriteSessionEvent::TAcksEvent>(&event)) {
for (auto& ack : e->Acks) {
if (ack.State == NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN) {
++context.AckCount;
switch (ack.State) {
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN:
++context.WrittenAckCount;
break;
case NTopic::TWriteSessionEvent::TWriteAck::EES_WRITTEN_IN_TX:
++context.WrittenInTxAckCount;
break;
default:
break;
}
}
} else if (auto* e = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
Expand All @@ -752,7 +768,7 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
}
}

UNIT_ASSERT(context.AckCount <= context.WriteCount);
UNIT_ASSERT(context.AckCount() <= context.WriteCount);
}

ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
Expand Down Expand Up @@ -1852,7 +1868,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
CreateTopic("topic_B", TEST_CONSUMER);
CreateTopic("topic_C", TEST_CONSUMER);

for (size_t i = 0; i < 2; ++i) {
for (size_t i = 0, writtenInTx = 0; i < 2; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0);
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0);

Expand All @@ -1862,12 +1878,14 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
++writtenInTx;
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx);

messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
++writtenInTx;
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID, writtenInTx);

CommitTx(tx, EStatus::SUCCESS);

Expand Down
6 changes: 4 additions & 2 deletions ydb/services/persqueue_v1/actors/write_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,14 +928,16 @@ void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse(
};

auto addAck = [this](const TPersQueuePartitionResponse::TCmdWriteResult& res,
Topic::StreamWriteMessage::WriteResponse* writeResponse,
Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) {
Topic::StreamWriteMessage::WriteResponse* writeResponse,
Topic::StreamWriteMessage::WriteResponse::WriteStatistics* stat) {
auto ack = writeResponse->add_acks();
// TODO (ildar-khisam@): validate res before filling ack fields
ack->set_seq_no(res.GetSeqNo());
if (res.GetAlreadyWritten()) {
Y_ABORT_UNLESS(UseDeduplication);
ack->mutable_skipped()->set_reason(Topic::StreamWriteMessage::WriteResponse::WriteAck::Skipped::REASON_ALREADY_WRITTEN);
} else if (res.HasWrittenInTx() && res.GetWrittenInTx()) {
ack->mutable_written_in_tx();
} else {
ack->mutable_written()->set_offset(res.GetOffset());
}
Expand Down