Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 194 additions & 106 deletions ydb/core/blobstorage/dsproxy/dsproxy.h

Large diffs are not rendered by default.

25 changes: 8 additions & 17 deletions ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,12 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor
return ERequestType::Assimilate;
}

TBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev, ui64 cookie,
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_ASSIMILATE, false, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(traceId), "DSProxy.Assimilate", ev, std::move(ev->ExecutionRelay),
NKikimrServices::TActivity::BS_GROUP_ASSIMILATE)
, SkipBlocksUpTo(ev->SkipBlocksUpTo)
, SkipBarriersUpTo(ev->SkipBarriersUpTo)
, SkipBlobsUpTo(ev->SkipBlobsUpTo)
, PerVDiskInfo(info->GetTotalVDisksNum())
TBlobStorageGroupAssimilateRequest(TBlobStorageGroupAssimilateParameters& params)
: TBlobStorageGroupRequestActor(params)
, SkipBlocksUpTo(params.Common.Event->SkipBlocksUpTo)
, SkipBarriersUpTo(params.Common.Event->SkipBarriersUpTo)
, SkipBlobsUpTo(params.Common.Event->SkipBlobsUpTo)
, PerVDiskInfo(Info->GetTotalVDisksNum())
, Result(new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::OK, {}))
{
Heap.reserve(PerVDiskInfo.size());
Expand Down Expand Up @@ -465,11 +459,8 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor
}
};

IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters) {
return new TBlobStorageGroupAssimilateRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters);
IActor* CreateBlobStorageGroupAssimilateRequest(TBlobStorageGroupAssimilateParameters params) {
return new TBlobStorageGroupAssimilateRequest(params);
}

} // NKikimr
28 changes: 9 additions & 19 deletions ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,13 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
return ERequestType::Block;
}

TBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
ui64 cookie, NWilson::TTraceId&& traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(traceId), "DSProxy.Block", ev, std::move(ev->ExecutionRelay),
NKikimrServices::TActivity::BS_GROUP_BLOCK)
, TabletId(ev->TabletId)
, Generation(ev->Generation)
, Deadline(ev->Deadline)
, IssuerGuid(ev->IssuerGuid)
, StartTime(now)
TBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, Generation(params.Common.Event->Generation)
, Deadline(params.Common.Event->Deadline)
, IssuerGuid(params.Common.Event->IssuerGuid)
, StartTime(params.Common.Now)
, QuorumTracker(Info.Get())
{}

Expand Down Expand Up @@ -176,11 +169,8 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
}
};

IActor* CreateBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupBlockRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters);
IActor* CreateBlobStorageGroupBlockRequest(TBlobStorageGroupBlockParameters params) {
return new TBlobStorageGroupBlockRequest(params);
}

} // NKikimr
44 changes: 17 additions & 27 deletions ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,22 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
return ERequestType::CollectGarbage;
}

TBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, ui64 cookie,
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(traceId), "DSProxy.CollectGarbage", ev, std::move(ev->ExecutionRelay),
NKikimrServices::TActivity::BS_GROUP_COLLECT_GARBAGE)
, TabletId(ev->TabletId)
, RecordGeneration(ev->RecordGeneration)
, PerGenerationCounter(ev->PerGenerationCounter)
, Channel(ev->Channel)
, Deadline(ev->Deadline)
, Keep(ev->Keep.Release())
, DoNotKeep(ev->DoNotKeep.Release())
, CollectGeneration(ev->CollectGeneration)
, CollectStep(ev->CollectStep)
, Hard(ev->Hard)
, Collect(ev->Collect)
, Decommission(ev->Decommission)
TBlobStorageGroupCollectGarbageRequest(TBlobStorageGroupCollectGarbageParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, RecordGeneration(params.Common.Event->RecordGeneration)
, PerGenerationCounter(params.Common.Event->PerGenerationCounter)
, Channel(params.Common.Event->Channel)
, Deadline(params.Common.Event->Deadline)
, Keep(params.Common.Event->Keep.Release())
, DoNotKeep(params.Common.Event->DoNotKeep.Release())
, CollectGeneration(params.Common.Event->CollectGeneration)
, CollectStep(params.Common.Event->CollectStep)
, Hard(params.Common.Event->Hard)
, Collect(params.Common.Event->Collect)
, Decommission(params.Common.Event->Decommission)
, QuorumTracker(Info.Get())
, StartTime(now)
, StartTime(params.Common.Now)
{}

void Bootstrap() override {
Expand Down Expand Up @@ -205,12 +199,8 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
}
};

IActor* CreateBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupCollectGarbageRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupCollectGarbageRequest(TBlobStorageGroupCollectGarbageParameters params) {
return new TBlobStorageGroupCollectGarbageRequest(params);
}

} // NKikimr
36 changes: 12 additions & 24 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,25 +869,18 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
return ERequestType::Discover;
}

TBlobStorageGroupDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(traceId), "DSProxy.Discover", ev, std::move(ev->ExecutionRelay),
NKikimrServices::TActivity::BS_GROUP_DISCOVER)
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, ReadBody(ev->ReadBody)
, DiscoverBlockedGeneration(ev->DiscoverBlockedGeneration)
, Deadline(ev->Deadline)
, StartTime(now)
TBlobStorageGroupDiscoverRequest(TBlobStorageGroupDiscoverParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
, Deadline(params.Common.Event->Deadline)
, StartTime(params.Common.Now)
, GroupResponseTracker(Info)
, IsGetBlockDone(!DiscoverBlockedGeneration)
, ForceBlockedGeneration(ev->ForceBlockedGeneration)
, FromLeader(ev->FromLeader)
, ForceBlockedGeneration(params.Common.Event->ForceBlockedGeneration)
, FromLeader(params.Common.Event->FromLeader)
{}

void Bootstrap() override {
Expand Down Expand Up @@ -965,13 +958,8 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
}
};

IActor* CreateBlobStorageGroupDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupDiscoverRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupDiscoverRequest(TBlobStorageGroupDiscoverParameters params) {
return new TBlobStorageGroupDiscoverRequest(params);
}

}//NKikimr
36 changes: 12 additions & 24 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,23 +457,16 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
return ERequestType::Discover;
}

TBlobStorageGroupMirror3dcDiscoverRequest(TIntrusivePtr<TBlobStorageGroupInfo> info,
TIntrusivePtr<TGroupQueues> state, const TActorId& source,
TIntrusivePtr<TBlobStorageGroupProxyMon> mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(traceId), "DSProxy.Discover", ev, std::move(ev->ExecutionRelay),
NKikimrServices::TActivity::BS_GROUP_DISCOVER)
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
, Deadline(ev->Deadline)
, ReadBody(ev->ReadBody)
, DiscoverBlockedGeneration(ev->DiscoverBlockedGeneration)
, ForceBlockedGeneration(ev->ForceBlockedGeneration)
, FromLeader(ev->FromLeader)
TBlobStorageGroupMirror3dcDiscoverRequest(TBlobStorageGroupDiscoverParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, StartTime(params.Common.Now)
, Deadline(params.Common.Event->Deadline)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
, ForceBlockedGeneration(params.Common.Event->ForceBlockedGeneration)
, FromLeader(params.Common.Event->FromLeader)
, GetBlockTracker(Info.Get())
{}

Expand Down Expand Up @@ -736,13 +729,8 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
}
};

IActor* CreateBlobStorageGroupMirror3dcDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupMirror3dcDiscoverRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupMirror3dcDiscoverRequest(TBlobStorageGroupDiscoverParameters params) {
return new TBlobStorageGroupMirror3dcDiscoverRequest(params);
}

}//NKikimr
36 changes: 12 additions & 24 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,16 @@ class TBlobStorageGroupMirror3of4DiscoverRequest : public TBlobStorageGroupReque
return ERequestType::Discover;
}

TBlobStorageGroupMirror3of4DiscoverRequest(TIntrusivePtr<TBlobStorageGroupInfo> info,
TIntrusivePtr<TGroupQueues> state, const TActorId& source,
TIntrusivePtr<TBlobStorageGroupProxyMon> mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, ev->RestartCounter,
std::move(traceId), "DSProxy.Discover", ev, std::move(ev->ExecutionRelay),
NKikimrServices::TActivity::BS_GROUP_DISCOVER)
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
, Deadline(ev->Deadline)
, ReadBody(ev->ReadBody)
, DiscoverBlockedGeneration(ev->DiscoverBlockedGeneration)
, ForceBlockedGeneration(ev->ForceBlockedGeneration)
, FromLeader(ev->FromLeader)
TBlobStorageGroupMirror3of4DiscoverRequest(TBlobStorageGroupDiscoverParameters& params)
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, StartTime(params.Common.Now)
, Deadline(params.Common.Event->Deadline)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
, ForceBlockedGeneration(params.Common.Event->ForceBlockedGeneration)
, FromLeader(params.Common.Event->FromLeader)
{
for (size_t i = 0; i < DiskState.size(); ++i) {
TDiskState& disk = DiskState[i];
Expand Down Expand Up @@ -363,13 +356,8 @@ class TBlobStorageGroupMirror3of4DiscoverRequest : public TBlobStorageGroupReque
}
};

IActor* CreateBlobStorageGroupMirror3of4DiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupMirror3of4DiscoverRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
IActor* CreateBlobStorageGroupMirror3of4DiscoverRequest(TBlobStorageGroupDiscoverParameters params) {
return new TBlobStorageGroupMirror3of4DiscoverRequest(params);
}

}//NKikimr
37 changes: 13 additions & 24 deletions ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,28 +395,23 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
return ERequestType::Get;
}

TBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev, ui64 cookie,
NWilson::TTraceId&& traceId, TNodeLayoutInfoPtr&& nodeLayout, TMaybe<TGroupStat::EKind> latencyQueueKind,
TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie,
NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo,
latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, std::move(traceId), "DSProxy.Get", ev,
std::move(ev->ExecutionRelay), NKikimrServices::TActivity::BS_PROXY_GET_ACTOR)
, GetImpl(info, state, ev, std::move(nodeLayout), LogCtx.RequestPrefix)
, Orbit(std::move(ev->Orbit))
, Deadline(ev->Deadline)
, StartTime(now)
TBlobStorageGroupGetRequest(TBlobStorageGroupGetParameters& params)
: TBlobStorageGroupRequestActor(params)
, GetImpl(Info, GroupQueues, params.Common.Event, std::move(params.NodeLayout),
LogCtx.RequestPrefix)
, Orbit(std::move(params.Common.Event->Orbit))
, Deadline(params.Common.Event->Deadline)
, StartTime(params.Common.Now)
, StartTimePut(StartTime)
, GroupSize(info->Type.BlobSubgroupSize())
, GroupSize(Info->Type.BlobSubgroupSize())
, ReportedBytes(0)
{
ReportBytes(sizeof(*this));
MaxSaneRequests = ev->QuerySize * info->Type.TotalPartCount() * (1 + info->Type.Handoff()) * 3;
MaxSaneRequests = params.Common.Event->QuerySize * Info->Type.TotalPartCount() *
(1 + Info->Type.Handoff()) * 3;

RequestBytes = GetImpl.CountRequestBytes();
RequestHandleClass = HandleClassToHandleClass(ev->GetHandleClass);
RequestHandleClass = HandleClassToHandleClass(params.Common.Event->GetHandleClass);
if (Orbit.HasShuttles()) {
RootCauseTrack.IsOn = true;
}
Expand Down Expand Up @@ -468,14 +463,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
}
};

IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev,
ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout,
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
return new TBlobStorageGroupGetRequest(info, state, source, mon, ev, cookie, std::move(traceId),
std::move(nodeLayout), latencyQueueKind, now, storagePoolCounters);
IActor* CreateBlobStorageGroupGetRequest(TBlobStorageGroupGetParameters params) {
return new TBlobStorageGroupGetRequest(params);
}

}//NKikimr
Loading