@@ -6,14 +6,6 @@ namespace NKafka {
66using namespace NKikimr ;
77using namespace NKikimr ::NGRpcProxy::V1;
88
9- static EKafkaErrors KqpStatusToKafkaError (Ydb::StatusIds::StatusCode status) {
10- // savnik finish it
11- if (status == Ydb::StatusIds::SUCCESS) {
12- return EKafkaErrors::NONE_ERROR;
13- }
14- return EKafkaErrors::UNKNOWN_SERVER_ERROR;
15- }
16-
179static constexpr ui8 MASTER_WAIT_JOINS_DELAY_SECONDS = 5 ;
1810static constexpr ui8 WAIT_FOR_MASTER_DELAY_SECONDS = 2 ;
1911static constexpr ui8 WAIT_MASTER_MAX_RETRY_COUNT = 5 ;
@@ -29,6 +21,20 @@ void TKafkaBalancerActor::Bootstrap(const NActors::TActorContext& ctx) {
2921 }
3022}
3123
24+ static EKafkaErrors KqpStatusToKafkaError (Ydb::StatusIds::StatusCode status) {
25+ // savnik finish it
26+ if (status == Ydb::StatusIds::SUCCESS) {
27+ return EKafkaErrors::NONE_ERROR;
28+ }
29+ return EKafkaErrors::UNKNOWN_SERVER_ERROR;
30+ }
31+
32+ TString TKafkaBalancerActor::LogPrefix () {
33+ TStringBuilder sb;
34+ sb << " TKafkaBalancerActor: " ;
35+ return sb;
36+ }
37+
3238void TKafkaBalancerActor::Handle (NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) {
3339 TablesInited++;
3440 if (TablesInited == TABLES_TO_INIT_COUNT) {
@@ -37,68 +43,28 @@ void TKafkaBalancerActor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&
3743}
3844
3945void TKafkaBalancerActor::Handle (NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
40- Cookie = 0 ;
41- const TString createSessionError = " Failed to create KQP session " ;
46+ Cookie = 0 ; // savnik
47+
4248 if (!Kqp->HandleCreateSessionResponse (ev, ctx)) {
43- switch (RequestType) {
44- case JOIN_GROUP:
45- SendJoinGroupResponseFail (ctx, CorrelationId,
46- EKafkaErrors::UNKNOWN_SERVER_ERROR,
47- createSessionError);
48- break ;
49- case SYNC_GROUP:
50- SendSyncGroupResponseFail (ctx, CorrelationId,
51- EKafkaErrors::UNKNOWN_SERVER_ERROR,
52- createSessionError);
53- break ;
54- case LEAVE_GROUP:
55- SendLeaveGroupResponseFail (ctx, CorrelationId,
56- EKafkaErrors::UNKNOWN_SERVER_ERROR,
57- createSessionError);
58- break ;
59- case HEARTBEAT:
60- SendHeartbeatResponseFail (ctx, CorrelationId,
61- EKafkaErrors::UNKNOWN_SERVER_ERROR,
62- createSessionError);
63- break ;
64- default :
65- break ;
66- }
49+ SendResponseFail (ctx, EKafkaErrors::UNKNOWN_SERVER_ERROR, " Failed to create KQP session" );
6750 PassAway ();
6851 return ;
6952 }
7053
71- switch (RequestType) {
72- case JOIN_GROUP:
73- HandleJoinGroupResponse (nullptr , ctx);
74- break ;
75- case SYNC_GROUP:
76- HandleSyncGroupResponse (nullptr , ctx);
77- break ;
78- case LEAVE_GROUP:
79- HandleLeaveGroupResponse (nullptr , ctx);
80- break ;
81- case HEARTBEAT:
82- HandleHeartbeatResponse (nullptr , ctx);
83- break ;
84- default :
85- KAFKA_LOG_CRIT (" Unknown RequestType in TEvCreateSessionResponse" );
86- PassAway ();
87- break ;
88- }
54+ HandleResponse (nullptr , ctx);
8955}
9056
9157void TKafkaBalancerActor::Handle (NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
92- const TString kqpQueryError = " KQP query error" ;
9358 if (ev->Cookie != KqpReqCookie) {
94- KAFKA_LOG_CRIT (" Unexpected cookie in TEvQueryResponse" );
59+ KAFKA_LOG_ERROR (" Unexpected cookie in TEvQueryResponse" );
9560 return ;
9661 }
9762
9863 const auto & record = ev->Get ()->Record ;
9964 auto status = record.GetYdbStatus ();
10065 if (status == ::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_ABORTED && CurrentRetryNumber < TX_ABORT_MAX_RETRY_COUNT) {
10166 CurrentRetryNumber++;
67+ KAFKA_LOG_I (TStringBuilder () << " Retry after tx aborted. Num of retry# " << CurrentRetryNumber);
10268 switch (RequestType) {
10369 case JOIN_GROUP:
10470 Register (new TKafkaBalancerActor (Context, Cookie, CorrelationId, JoinGroupRequestData, CurrentRetryNumber));
@@ -123,26 +89,21 @@ void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const
12389 auto kafkaErr = KqpStatusToKafkaError (status);
12490
12591 if (kafkaErr != EKafkaErrors::NONE_ERROR) {
126- switch (RequestType) {
127- case JOIN_GROUP:
128- SendJoinGroupResponseFail (ctx, CorrelationId, kafkaErr, kqpQueryError);
129- break ;
130- case SYNC_GROUP:
131- SendSyncGroupResponseFail (ctx, CorrelationId, kafkaErr, kqpQueryError);
132- break ;
133- case LEAVE_GROUP:
134- SendLeaveGroupResponseFail (ctx, CorrelationId, kafkaErr, kqpQueryError);
135- break ;
136- case HEARTBEAT:
137- SendHeartbeatResponseFail (ctx, CorrelationId, kafkaErr, kqpQueryError);
138- break ;
139- default :
140- break ;
141- }
142- PassAway ();
143- return ;
92+ auto kqpQueryError = TStringBuilder () <<" Kqp error. " ;
93+
94+ NYql::TIssues issues;
95+ NYql::IssuesFromMessage (record.GetResponse ().GetQueryIssues (), issues);
96+ NYdb::TStatus status (NYdb::EStatus (record.GetYdbStatus ()), std::move (issues));
97+ kqpQueryError << status;
98+
99+ SendResponseFail (ctx, kafkaErr, kqpQueryError);
144100 }
145101
102+ HandleResponse (ev, ctx);
103+ }
104+
105+ void TKafkaBalancerActor::HandleResponse (NKqp::TEvKqp::TEvQueryResponse::TPtr ev, const TActorContext& ctx) {
106+ KAFKA_LOG_I (TStringBuilder () << " Handle kqp response. CurrentStep# " << (ui8)CurrentStep);
146107 switch (RequestType) {
147108 case JOIN_GROUP:
148109 HandleJoinGroupResponse (ev, ctx);
@@ -157,12 +118,31 @@ void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const
157118 HandleHeartbeatResponse (ev, ctx);
158119 break ;
159120 default :
160- KAFKA_LOG_CRIT (" Unknown RequestType in TEvCreateSessionResponse" );
121+ KAFKA_LOG_ERROR (" Unknown RequestType in TEvCreateSessionResponse" );
161122 PassAway ();
162123 break ;
163124 }
164125}
165126
127+ void TKafkaBalancerActor::SendResponseFail (const TActorContext& ctx, EKafkaErrors error, const TString& message) {
128+ switch (RequestType) {
129+ case JOIN_GROUP:
130+ SendJoinGroupResponseFail (ctx, CorrelationId, error, message);
131+ break ;
132+ case SYNC_GROUP:
133+ SendSyncGroupResponseFail (ctx, CorrelationId, error, message);
134+ break ;
135+ case LEAVE_GROUP:
136+ SendLeaveGroupResponseFail (ctx, CorrelationId, error, message);
137+ break ;
138+ case HEARTBEAT:
139+ SendHeartbeatResponseFail (ctx, CorrelationId, error, message);
140+ break ;
141+ default :
142+ break ;
143+ }
144+ }
145+
166146std::optional<TGroupStatus> TKafkaBalancerActor::ParseCheckStateAndGeneration (
167147 NKqp::TEvKqp::TEvQueryResponse::TPtr ev
168148) {
@@ -302,8 +282,8 @@ bool TKafkaBalancerActor::ParseWorkerStatesAndChooseProtocol(
302282 }
303283
304284 for (const auto & st : states) {
305- const auto & protos = st.WorkerState .protocols ();
306- for (const auto & pr : protos ) {
285+ const auto & protocols = st.WorkerState .protocols ();
286+ for (const auto & pr : protocols ) {
307287 if (pr.protocol_name () == chosenProtocol) {
308288 workerStates[st.MemberId ] = pr.metadata ();
309289 break ;
@@ -342,7 +322,7 @@ bool TKafkaBalancerActor::ParseDeadCount(
342322}
343323
344324void TKafkaBalancerActor::Die (const TActorContext& ctx) {
345- KAFKA_LOG_D (" TKafkaBalancerActor pass away" );
325+ KAFKA_LOG_D (" Pass away" );
346326 TBase::Die (ctx);
347327}
348328
@@ -400,6 +380,8 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildAssignmentsParams() {
400380
401381 auto & assignmentList = params.AddParam (" $Assignments" ).BeginList ();
402382
383+ KAFKA_LOG_D (TStringBuilder () << " Assignments count: " << SyncGroupRequestData->Assignments .size ());
384+
403385 for (auto & assignment: SyncGroupRequestData->Assignments ) {
404386 assignmentList.AddListItem ()
405387 .BeginStruct ()
@@ -1117,7 +1099,7 @@ void TKafkaBalancerActor::SendJoinGroupResponseFail(const TActorContext&,
11171099 EKafkaErrors error,
11181100 TString message) {
11191101
1120- KAFKA_LOG_CRIT (" JOIN_GROUP failed. reason# " << message);
1102+ KAFKA_LOG_ERROR (" JOIN_GROUP failed. reason# " << message);
11211103 auto response = std::make_shared<TJoinGroupResponseData>();
11221104 response->ErrorCode = error;
11231105 Send (Context->ConnectionId , new TEvKafka::TEvResponse (corellationId, response, error));
@@ -1127,7 +1109,7 @@ void TKafkaBalancerActor::SendSyncGroupResponseFail(const TActorContext&,
11271109 ui64 corellationId,
11281110 EKafkaErrors error,
11291111 TString message) {
1130- KAFKA_LOG_CRIT (" SYNC_GROUP failed. reason# " << message);
1112+ KAFKA_LOG_ERROR (" SYNC_GROUP failed. reason# " << message);
11311113 auto response = std::make_shared<TSyncGroupResponseData>();
11321114 response->ErrorCode = error;
11331115 response->Assignment = " " ;
@@ -1138,7 +1120,7 @@ void TKafkaBalancerActor::SendLeaveGroupResponseFail(const TActorContext&,
11381120 ui64 corellationId,
11391121 EKafkaErrors error,
11401122 TString message) {
1141- KAFKA_LOG_CRIT (" LEAVE_GROUP failed. reason# " << message);
1123+ KAFKA_LOG_ERROR (" LEAVE_GROUP failed. reason# " << message);
11421124 auto response = std::make_shared<TLeaveGroupResponseData>();
11431125 response->ErrorCode = error;
11441126 Send (Context->ConnectionId , new TEvKafka::TEvResponse (corellationId, response, error));
@@ -1148,7 +1130,7 @@ void TKafkaBalancerActor::SendHeartbeatResponseFail(const TActorContext&,
11481130 ui64 corellationId,
11491131 EKafkaErrors error,
11501132 TString message) {
1151- KAFKA_LOG_CRIT (" HEARTBEAT failed. reason# " << message);
1133+ KAFKA_LOG_ERROR (" HEARTBEAT failed. reason# " << message);
11521134 auto response = std::make_shared<THeartbeatResponseData>();
11531135 response->ErrorCode = error;
11541136 Send (Context->ConnectionId , new TEvKafka::TEvResponse (corellationId, response, error));
0 commit comments