|
| 1 | +#include "msgbus_servicereq.h" |
| 2 | +#include <ydb/core/mind/local.h> |
| 3 | +#include <ydb/core/protos/local.pb.h> |
| 4 | +namespace NKikimr { |
| 5 | +namespace NMsgBusProxy { |
| 6 | + |
| 7 | +namespace { |
| 8 | + const ui32 DefaultTimeout = 90000; |
| 9 | +} |
| 10 | + |
| 11 | +template <typename ResponseType> |
| 12 | +class TMessageBusLocalEnumerateTablets: public TMessageBusLocalServiceRequest<TMessageBusLocalEnumerateTablets<ResponseType>, NKikimrServices::TActivity::FRONT_ENUMERATE_TABLETS> { |
| 13 | + using TBase = TMessageBusLocalServiceRequest<TMessageBusLocalEnumerateTablets<ResponseType>, NKikimrServices::TActivity::FRONT_ENUMERATE_TABLETS>; |
| 14 | + ui64 DomainUid; |
| 15 | + ui32 NodeId; |
| 16 | + TTabletTypes::EType TabletType; |
| 17 | + bool IsFiltered; |
| 18 | + bool IsOk; |
| 19 | + bool IsNodeIdPresent; |
| 20 | +public: |
| 21 | + TMessageBusLocalEnumerateTablets(TBusMessageContext &msg, TDuration timeout) |
| 22 | + : TBase(msg, timeout) |
| 23 | + , DomainUid(0) |
| 24 | + , NodeId(0) |
| 25 | + , TabletType() |
| 26 | + , IsFiltered(false) |
| 27 | + , IsOk(true) |
| 28 | + , IsNodeIdPresent(false) |
| 29 | + { |
| 30 | + const auto &record = static_cast<TBusLocalEnumerateTablets*>(msg.GetMessage())->Record; |
| 31 | + IsOk = IsOk && record.HasDomainUid(); |
| 32 | + if (record.HasNodeId()) { |
| 33 | + IsNodeIdPresent = true; |
| 34 | + NodeId = record.GetNodeId(); |
| 35 | + } |
| 36 | + if (IsOk) { |
| 37 | + DomainUid = record.GetDomainUid(); |
| 38 | + if (record.HasTabletType()) { |
| 39 | + IsFiltered = true; |
| 40 | + TabletType = record.GetTabletType(); |
| 41 | + } |
| 42 | + } |
| 43 | + } |
| 44 | + |
| 45 | + void Handle(TEvLocal::TEvEnumerateTabletsResult::TPtr &ev, const TActorContext &ctx) { |
| 46 | + const NKikimrLocal::TEvEnumerateTabletsResult &record = ev->Get()->Record; |
| 47 | + Y_ABORT_UNLESS(record.HasStatus()); |
| 48 | + THolder<ResponseType> response(new ResponseType()); |
| 49 | + if (record.GetStatus() != NKikimrProto::OK) { |
| 50 | + response->Record.SetStatus(MSTATUS_ERROR); |
| 51 | + response->Record.SetErrorReason(Sprintf("Local response is not OK (Status# %s), Marker# LE1", |
| 52 | + NKikimrProto::EReplyStatus_Name(record.GetStatus()).data())); |
| 53 | + TBase::SendReplyAndDie(response.Release(), ctx); |
| 54 | + } else { |
| 55 | + response->Record.SetStatus(MSTATUS_OK); |
| 56 | + for (ui32 i = 0; i < record.TabletInfoSize(); ++i) { |
| 57 | + auto &srcInfo = record.GetTabletInfo(i); |
| 58 | + auto *dstInfo = response->Record.AddTabletInfo(); |
| 59 | + if (srcInfo.HasTabletId()) { |
| 60 | + dstInfo->SetTabletId(srcInfo.GetTabletId()); |
| 61 | + } |
| 62 | + if (srcInfo.HasTabletType()) { |
| 63 | + dstInfo->SetTabletType(srcInfo.GetTabletType()); |
| 64 | + } |
| 65 | + } |
| 66 | + TBase::SendReplyAndDie(response.Release(), ctx); |
| 67 | + } |
| 68 | + } |
| 69 | + |
| 70 | + TActorId MakeServiceID(const TActorContext &ctx) { |
| 71 | + auto &domainsInfo = AppData(ctx)->DomainsInfo; |
| 72 | + if (!domainsInfo->Domain || domainsInfo->GetDomain()->DomainUid != DomainUid) { |
| 73 | + // Report details in CreateErrorReply |
| 74 | + TActorId invalidId; |
| 75 | + return invalidId; |
| 76 | + } |
| 77 | + ui32 nodeId = IsNodeIdPresent ? NodeId : ctx.SelfID.NodeId(); |
| 78 | + ui64 hiveId = domainsInfo->GetHive(); |
| 79 | + return MakeLocalRegistrarID(nodeId, hiveId); |
| 80 | + } |
| 81 | + |
| 82 | + TEvLocal::TEvEnumerateTablets* MakeReq(const TActorContext &ctx) { |
| 83 | + Y_UNUSED(ctx); |
| 84 | + if (IsFiltered) { |
| 85 | + return new TEvLocal::TEvEnumerateTablets(TabletType); |
| 86 | + } |
| 87 | + return new TEvLocal::TEvEnumerateTablets(); |
| 88 | + } |
| 89 | + |
| 90 | + NBus::TBusMessage* CreateErrorReply(EResponseStatus status, const TActorContext &ctx) { |
| 91 | + Y_UNUSED(ctx); |
| 92 | + Y_UNUSED(status); |
| 93 | + ui64 nodeId = IsNodeIdPresent ? NodeId : ctx.SelfID.NodeId(); |
| 94 | + THolder<ResponseType> response(new ResponseType()); |
| 95 | + response->Record.SetStatus(MSTATUS_ERROR); |
| 96 | + response->Record.SetErrorReason(Sprintf("Invalid DomainUid# %" PRIu64 ", NodeId# %" PRIu64 |
| 97 | + " or kikimr hive/domain/node configuration, Marker# LE3", (ui64)DomainUid, (ui64)nodeId)); |
| 98 | + return response.Release(); |
| 99 | + } |
| 100 | + |
| 101 | + void HandleTimeout(const TActorContext &ctx) { |
| 102 | + Y_UNUSED(ctx); |
| 103 | + TAutoPtr<TBusResponse> response(new TBusResponseStatus(MSTATUS_TIMEOUT, "")); |
| 104 | + TBase::SendReplyAndDie(response.Release(), ctx); |
| 105 | + } |
| 106 | + |
| 107 | + void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) { |
| 108 | + Y_UNUSED(ev); |
| 109 | + THolder<ResponseType> response(new ResponseType()); |
| 110 | + ui64 nodeId = IsNodeIdPresent ? NodeId : ctx.SelfID.NodeId(); |
| 111 | + response->Record.SetStatus(MSTATUS_ERROR); |
| 112 | + response->Record.SetErrorReason(Sprintf("Request was not delivered to Local, NodeId# %" PRIu64 |
| 113 | + ", Marker# LE2", (ui64)nodeId)); |
| 114 | + TBase::SendReplyAndDie(response.Release(), ctx); |
| 115 | + |
| 116 | + } |
| 117 | + |
| 118 | + STFUNC(StateFunc) { |
| 119 | + switch (ev->GetTypeRewrite()) { |
| 120 | + HFunc(TEvLocal::TEvEnumerateTabletsResult, Handle); |
| 121 | + HFunc(TEvents::TEvUndelivered, HandleUndelivered); |
| 122 | + CFunc(TEvents::TSystem::Wakeup, HandleTimeout); |
| 123 | + } |
| 124 | + } |
| 125 | +}; |
| 126 | + |
| 127 | +IActor* CreateMessageBusLocalEnumerateTablets(TBusMessageContext &msg) { |
| 128 | + //const auto &record = static_cast<TBusLocalEnumerateTablets*>(msg.GetMessage())->Record; |
| 129 | + //const TDuration timeout = TDuration::MilliSeconds(record.HasTimeout() ? record.GetTimeout() : DefaultTimeout); |
| 130 | + const TDuration timeout = TDuration::MilliSeconds(DefaultTimeout); |
| 131 | + |
| 132 | + if (msg.GetMessage()->GetHeader()->Type == MTYPE_CLIENT_OLD_LOCAL_ENUMERATE_TABLETS) { |
| 133 | + return new TMessageBusLocalEnumerateTablets<TBusLocalEnumerateTabletsResult>(msg, timeout); |
| 134 | + } else { |
| 135 | + return new TMessageBusLocalEnumerateTablets<TBusResponse>(msg, timeout); |
| 136 | + } |
| 137 | +} |
| 138 | + |
| 139 | +} |
| 140 | +} |
0 commit comments