Skip to content

Commit c6077a3

Browse files
authored
fixed MaxCommittedTimeLag (#27553) (#27644)
2 parents 273397c + 5c5752a commit c6077a3

File tree

2 files changed

+18
-18
lines changed

2 files changed

+18
-18
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -758,9 +758,6 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
758758
if (userInfo.Offset >= static_cast<i64>(EndOffset)) {
759759
result.LastCommittedMessage.CreateTimestamp = now;
760760
result.LastCommittedMessage.WriteTimestamp = now;
761-
} else if (userInfo.ActualTimestamps) {
762-
result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp;
763-
result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;
764761
} else {
765762
auto timestamp = GetWriteTimeEstimate(userInfo.Offset);
766763
result.LastCommittedMessage.CreateTimestamp = timestamp;
@@ -782,14 +779,14 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
782779
result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp;
783780
} else {
784781
auto timestamp = GetWriteTimeEstimate(readOffset);
785-
result.LastCommittedMessage.CreateTimestamp = timestamp;
786-
result.LastCommittedMessage.WriteTimestamp = timestamp;
782+
result.LastReadMessage.CreateTimestamp = timestamp;
783+
result.LastReadMessage.WriteTimestamp = timestamp;
787784
}
788785

789786
if (readOffset < (i64)EndOffset) {
790787
result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp;
791788
}
792-
result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now;
789+
result.CommitedLag = now - result.LastCommittedMessage.WriteTimestamp;
793790
result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp);
794791

795792
return result;

ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
5555
session->Close(TDuration::Seconds(5));
5656
};
5757

58-
// Check describe for empty topic
58+
Cerr << ">>>>> Check describe for empty topic\n";
5959
{
6060
auto d = describe();
6161
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
@@ -75,44 +75,47 @@ Y_UNIT_TEST_SUITE(WithSDK) {
7575
}
7676

7777
write(3);
78+
Sleep(TDuration::Seconds(2));
7879
write(7);
80+
Sleep(TDuration::Seconds(2));
81+
write(11);
7982

80-
// Check describe for topic which contains messages, but consumer hasn`t read
83+
Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n";
8184
{
8285
auto d = describe();
8386
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
8487
UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
8588
auto& p = d.GetPartitions()[0];
8689
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
8790
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
88-
UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
91+
UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
8992
auto& c = p.GetPartitionConsumerStats();
9093
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
9194
UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset());
9295
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); //
9396
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
94-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
97+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
9598
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
9699
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
97100
}
98101

99102
UNIT_ASSERT(setup.Commit(TString{TEST_TOPIC}, TEST_CONSUMER, 0, 1).IsSuccess());
100103

101-
// Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)
104+
Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n";
102105
{
103106
auto d = describe();
104107
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
105108
UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
106109
auto& p = d.GetPartitions()[0];
107110
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
108111
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
109-
UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
112+
UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
110113
auto& c = p.GetPartitionConsumerStats();
111114
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
112115
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
113116
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
114117
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
115-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
118+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
116119
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
117120
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
118121
}
@@ -143,23 +146,23 @@ Y_UNIT_TEST_SUITE(WithSDK) {
143146
session->Close(TDuration::Seconds(1));
144147
}
145148

146-
// Check describe for topic wich contains messages, has commited offset of first message and read second message
149+
Cerr << ">>>>> Check describe for topic wich contains messages, has commited offset of first message and read second message\n";
147150
{
148151
auto d = describe();
149152
UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName());
150153
UNIT_ASSERT_VALUES_EQUAL(1, d.GetPartitions().size());
151154
auto& p = d.GetPartitions()[0];
152155
UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId());
153156
UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive());
154-
UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset());
157+
UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset());
155158
auto& c = p.GetPartitionConsumerStats();
156159
UNIT_ASSERT_VALUES_EQUAL(true, c.has_value());
157160
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
158-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
161+
//UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
159162
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
160-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag());
163+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
161164
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
162-
UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset());
165+
UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset());
163166
}
164167
}
165168
}

0 commit comments

Comments
 (0)