Skip to content

LOGBROKER-8894 enable minSeqNo save in partition (iter3) #1768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bool TPartitionSourceManager::HasParents() const {

TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format)
: Manager(manager)
, Node(Manager.GetPartitionNode())
, Node(Manager.GetPartitionNode())
, SourceIdWriter(format)
, HeartbeatEmitter(Manager.Partition.SourceIdStorage) {
}
Expand Down Expand Up @@ -104,6 +104,7 @@ TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager
TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
TPartitionSourceManager::TSourceInfo result(value.State);
result.SeqNo = value.SeqNo;
result.MinSeqNo = value.MinSeqNo;
result.Offset = value.Offset;
result.Explicit = value.Explicit;
result.WriteTimestamp = value.WriteTimestamp;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TPartitionSourceManager {

TSourceIdInfo::EState State;
ui64 SeqNo = 0;
ui64 MinSeqNo = 0;
ui64 Offset = 0;
bool Explicit = false;
TInstant WriteTimestamp;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) {

const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize;
if (currentSize != 0 && currentSize + size > maxWriteInflightSize) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
break;
}

if (WaitingForSubDomainQuota(ctx, currentSize)) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
break;
}

Expand Down Expand Up @@ -727,7 +727,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const
return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED,
"SourceId doesn't exist");
}

EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx);
}

Expand Down Expand Up @@ -1519,7 +1519,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);

Y_ABORT_UNLESS(Head.PackedSize + NewHead.PackedSize <= 2 * MaxSizeCheck);

TInstant now = ctx.Now();
WriteCycleStartTime = now;

Expand Down Expand Up @@ -1592,7 +1592,7 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w

void TPartition::WriteBlobWithQuota(const TActorContext& /*ctx*/, THolder<TEvKeyValue::TEvRequest>&& request) {
PQ_LOG_T("TPartition::WriteBlobWithQuota.");

// Request quota and write blob.
// Mirrored topics are not quoted in local dc.
const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty();
Expand Down
38 changes: 19 additions & 19 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
PreparedResponse = std::make_shared<NKikimrClient::TResponse>();
}
}

auto& responseRecord = isDirectRead ? *PreparedResponse : Response->Record;
responseRecord.SetStatus(NMsgBusProxy::MSTATUS_OK);
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);

Y_ABORT_UNLESS(readResult.ResultSize() > 0);
bool isStart = false;
if (!responseRecord.HasPartitionResponse()) {
Expand Down Expand Up @@ -191,7 +191,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
}

if (isNewMsg) {
if (!isStart && readResult.GetResult(i).HasTotalParts()
if (!isStart && readResult.GetResult(i).HasTotalParts()
&& readResult.GetResult(i).GetTotalParts() + i > readResult.ResultSize()) //last blob is not full
break;
partResp->AddResult()->CopyFrom(readResult.GetResult(i));
Expand Down Expand Up @@ -292,7 +292,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
};


TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration,
TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration,
const TDirectReadKey& directReadKey, const NKikimrClient::TPersQueueRequest& request,
const TActorContext& ctx)
{
Expand All @@ -304,7 +304,7 @@ class TResponseBuilder {
public:

TResponseBuilder(const TActorId& sender, const TActorId& tablet, const TString& topicName, const ui32 partition, const ui64 messageNo,
const TString& reqId, const TMaybe<ui64> cookie, NMetrics::TResourceMetrics* resourceMetrics,
const TString& reqId, const TMaybe<ui64> cookie, NMetrics::TResourceMetrics* resourceMetrics,
const TActorContext& ctx)
: Sender(sender)
, Tablet(tablet)
Expand Down Expand Up @@ -639,7 +639,7 @@ struct TPersQueue::TReplyToActor {
Event(std::move(event))
{
}

TActorId ActorId;
TEventBasePtr Event;
};
Expand Down Expand Up @@ -840,7 +840,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
return;
}

Y_ABORT_UNLESS(readRange.HasStatus());
if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
Expand Down Expand Up @@ -1267,7 +1267,7 @@ void TPersQueue::FinishResponse(THashMap<ui64, TAutoPtr<TResponseBuilder>>::iter


void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx)
{
{
if (!ConfigInited) {
UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender);
return;
Expand Down Expand Up @@ -1304,7 +1304,7 @@ void TPersQueue::TrySendUpdateConfigResponses(const TActorContext& ctx)

ChangeConfigNotification.clear();
}

void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
NPersQueue::TConverterFactoryPtr& converterFactory,
NPersQueue::TTopicConverterPtr& topicConverter,
Expand Down Expand Up @@ -2109,7 +2109,7 @@ void TPersQueue::HandleReadRequest(
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::READ_ERROR_NO_SESSION,
TStringBuilder() << "Read prepare request with unknown(old?) session id " << cmd.GetSessionId());
return;
}
}
}

THolder<TEvPQ::TEvRead> event =
Expand Down Expand Up @@ -2375,7 +2375,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
}
ResponseProxy[responseCookie] = ans;
Counters->Simple()[COUNTER_PQ_TABLET_INFLIGHT].Set(ResponseProxy.size());

if (!ConfigInited) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::INITIALIZING, "tablet is not ready");
return;
Expand All @@ -2396,11 +2396,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, "no partition number");
return;
}

TPartitionId partition(req.GetPartition());
auto it = Partitions.find(partition);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
<< (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "' partition " << partition);

if (it == Partitions.end()) {
Expand Down Expand Up @@ -2859,7 +2859,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte

std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack;
if (!(event.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) {
ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
}

if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->Senders.contains(event.GetTabletProducer())) {
Expand Down Expand Up @@ -2927,7 +2927,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx)
{
const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get();

auto tx = GetTransaction(ctx, event.TxId);
if (!tx) {
return;
Expand Down Expand Up @@ -3582,7 +3582,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);

PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" <<
", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected);
if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
Y_ABORT_UNLESS(!TxQueue.empty());
Expand Down Expand Up @@ -3728,7 +3728,7 @@ TPartition* TPersQueue::CreatePartitionActor(const TPartitionId& partitionId,
const TActorContext& ctx)
{
int channels = Info()->Channels.size() - NKeyValue::BLOB_CHANNEL; // channels 0,1 are reserved in tablet
Y_ABORT_UNLESS(channels > 0);
Y_ABORT_UNLESS(channels > 0);

return new TPartition(TabletID(),
partitionId,
Expand Down Expand Up @@ -3793,7 +3793,7 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig&
Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId());
}
}

void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
THashMap<ui32, TVector<TTransaction>>& partitionTxs)
{
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/persqueue/sourceid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {

TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
: SeqNo(seqNo)
, MinSeqNo(seqNo)
, Offset(offset)
, WriteTimestamp(createTs)
, CreateTimestamp(createTs)
Expand All @@ -111,6 +112,7 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)

TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
: SeqNo(seqNo)
, MinSeqNo(seqNo)
, Offset(offset)
, WriteTimestamp(createTs)
, CreateTimestamp(createTs)
Expand All @@ -120,6 +122,7 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartb

TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
: SeqNo(seqNo)
, MinSeqNo(seqNo)
, Offset(offset)
, CreateTimestamp(createTs)
, Explicit(true)
Expand All @@ -133,6 +136,9 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const {
auto copy = *this;
copy.SeqNo = seqNo;
if (copy.MinSeqNo == 0) {
copy.MinSeqNo = seqNo;
}
copy.Offset = offset;
copy.WriteTimestamp = writeTs;

Expand Down Expand Up @@ -178,6 +184,7 @@ void TSourceIdInfo::Serialize(TBuffer& data) const {
TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
TSourceIdInfo result;
result.SeqNo = proto.GetSeqNo();
result.MinSeqNo = proto.GetMinSeqNo();
result.Offset = proto.GetOffset();
result.WriteTimestamp = TInstant::FromValue(proto.GetWriteTimestamp());
result.CreateTimestamp = TInstant::FromValue(proto.GetCreateTimestamp());
Expand All @@ -197,6 +204,7 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {

void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const {
proto.SetSeqNo(SeqNo);
proto.SetMinSeqNo(MinSeqNo);
proto.SetOffset(Offset);
proto.SetWriteTimestamp(WriteTimestamp.GetValue());
proto.SetCreateTimestamp(CreateTimestamp.GetValue());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct TSourceIdInfo {
};

ui64 SeqNo = 0;
ui64 MinSeqNo = 0;
ui64 Offset = 0;
TInstant WriteTimestamp;
TInstant CreateTimestamp;
Expand Down
34 changes: 33 additions & 1 deletion ydb/core/persqueue/ut/sourceid_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
Y_UNIT_TEST(SourceIdStorageComplexDelete) {
TSourceIdStorage storage;
for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
storage.RegisterSourceId(TestSourceId(i), i, i , TInstant::Seconds(10 * i));
}

NKikimrPQ::TPartitionConfig config;
Expand Down Expand Up @@ -460,6 +460,38 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
}
}

Y_UNIT_TEST(SourceIdMinSeqNo) {
TSourceIdStorage storage;

const auto sourceId = TestSourceId(1);
const auto sourceIdInfo = TSourceIdInfo(2, 10, TInstant::Seconds(100));
const auto anotherSourceId = TestSourceId(2);
const auto anotherSourceIdInfo = TSourceIdInfo(0, 20, TInstant::Seconds(200));

storage.RegisterSourceId(sourceId, sourceIdInfo);
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
{
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 0);
}

storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(3, 11, TInstant::Seconds(100)));
{
auto it = storage.GetInMemorySourceIds().find(sourceId);
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
}
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(1, 12, TInstant::Seconds(100)));
{
auto it = storage.GetInMemorySourceIds().find(sourceId);
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
}
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(3, 12, TInstant::Seconds(100)));
{
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 3);
}
}

} // TSourceIdTests

} // namespace NKikimr::NPQ
1 change: 1 addition & 0 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ message TMessageGroupInfo {
}

optional uint64 SeqNo = 1;
optional uint64 MinSeqNo = 9;
optional uint64 Offset = 2;
optional uint64 WriteTimestamp = 3; // TInstant::TValue
optional uint64 CreateTimestamp = 4; // TInstant::TValue
Expand Down