-
Notifications
You must be signed in to change notification settings - Fork 694
Kafka connect distributed #12960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
niksaveliev
merged 22 commits into
ydb-platform:main
from
niksaveliev:kafka-connect-distributed
Mar 12, 2025
Merged
Kafka connect distributed #12960
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
59788cc
draft
niksaveliev 38f0a99
fix
niksaveliev b718bb5
work
niksaveliev 94201cf
fix
niksaveliev c074c0b
move db and timestamps to variables
niksaveliev 443c8ae
work
niksaveliev b73f1c0
work
niksaveliev c6f0a6c
fix after rebase
niksaveliev 5a1e3d5
few fixes and logs
niksaveliev 836a56d
fix
niksaveliev e518835
fix
niksaveliev 149b7de
fixes after review
niksaveliev a42d963
refactoring
niksaveliev 58d4f41
fixes
niksaveliev b306d7a
improve rebalances
niksaveliev 4c6b51d
fixes after perf tests
niksaveliev c1a663e
Merge branch 'main' into kafka-connect-distributed
niksaveliev 32c4fe1
fix
niksaveliev 30608b4
Merge branch 'kafka-connect-distributed' of github.com:niksaveliev/yd…
niksaveliev 5e51b67
fix includes
niksaveliev fd35bc3
fix
niksaveliev de1b190
fix
niksaveliev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
356 changes: 356 additions & 0 deletions
356
ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,356 @@ | ||
#include "kafka_balancer_actor.h" | ||
|
||
namespace NKafka { | ||
|
||
const TString INSERT_NEW_GROUP = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $State AS Uint64; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $Master AS Utf8; | ||
DECLARE $LastMasterHeartbeat AS Datetime; | ||
DECLARE $ProtocolType AS Utf8; | ||
|
||
INSERT INTO `%s` | ||
( | ||
consumer_group, | ||
generation, | ||
state, | ||
database, | ||
last_heartbeat_time, | ||
master, | ||
protocol_type | ||
) | ||
VALUES | ||
( | ||
$ConsumerGroup, | ||
$Generation, | ||
$State, | ||
$Database, | ||
$LastMasterHeartbeat, | ||
$Master, | ||
$ProtocolType | ||
); | ||
)sql"; | ||
|
||
const TString UPDATE_GROUP = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $State AS Uint64; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $Master AS Utf8; | ||
DECLARE $LastMasterHeartbeat AS Datetime; | ||
|
||
UPDATE `%s` | ||
SET | ||
state = $State, | ||
generation = $Generation, | ||
last_heartbeat_time = $LastMasterHeartbeat, | ||
master = $Master | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup; | ||
)sql"; | ||
|
||
const TString UPDATE_GROUP_STATE = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $State AS Uint64; | ||
DECLARE $Generation AS Uint64; | ||
|
||
UPDATE `%s` | ||
SET | ||
state = $State | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation; | ||
)sql"; | ||
|
||
const TString UPDATE_GROUP_STATE_AND_PROTOCOL = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $State AS Uint64; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $Protocol AS Utf8; | ||
DECLARE $LastMasterHeartbeat AS Datetime; | ||
|
||
UPDATE `%s` | ||
SET | ||
state = $State, | ||
last_heartbeat_time = $LastMasterHeartbeat, | ||
protocol = $Protocol | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup; | ||
)sql"; | ||
|
||
const TString INSERT_MEMBER = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $InstanceId AS Utf8; | ||
DECLARE $WorkerStateProto AS String; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $HeartbeatDeadline AS Datetime; | ||
DECLARE $SessionTimeoutMs AS Uint32; | ||
DECLARE $RebalanceTimeoutMs AS Uint32; | ||
|
||
INSERT INTO `%s` | ||
( | ||
consumer_group, | ||
generation, | ||
member_id, | ||
instance_id, | ||
heartbeat_deadline, | ||
worker_state_proto, | ||
database, | ||
session_timeout_ms, | ||
rebalance_timeout_ms | ||
) | ||
VALUES ( | ||
$ConsumerGroup, | ||
$Generation, | ||
$MemberId, | ||
$InstanceId, | ||
$HeartbeatDeadline, | ||
$WorkerStateProto, | ||
$Database, | ||
$SessionTimeoutMs, | ||
$RebalanceTimeoutMs | ||
); | ||
)sql"; | ||
|
||
const TString UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE = R"sql( | ||
--!syntax_v1 | ||
DECLARE $Assignments AS List<Struct<MemberId: Utf8, Assignment: Bytes>>; | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $State AS Uint64; | ||
DECLARE $LastMasterHeartbeat AS Datetime; | ||
|
||
UPSERT INTO `%s` | ||
SELECT | ||
item.MemberId AS member_id, | ||
item.Assignment AS assignment, | ||
$ConsumerGroup AS consumer_group, | ||
$Database AS database, | ||
$Generation AS generation | ||
FROM AS_TABLE($Assignments) AS item; | ||
|
||
UPDATE `%s` | ||
SET | ||
state = $State, | ||
last_heartbeat_time = $LastMasterHeartbeat, | ||
last_success_generation = $Generation | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup; | ||
)sql"; | ||
|
||
const TString SELECT_ALIVE_MEMBERS = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $PaginationMemberId AS Utf8; | ||
DECLARE $Limit AS Uint64; | ||
|
||
SELECT member_id, instance_id, rebalance_timeout_ms | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id > $PaginationMemberId | ||
AND (leaved IS NULL OR leaved = False) | ||
ORDER BY member_id | ||
LIMIT $Limit; | ||
)sql"; | ||
|
||
const TString SELECT_WORKER_STATES = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $PaginationMemberId AS Utf8; | ||
DECLARE $Limit AS Uint64; | ||
|
||
SELECT worker_state_proto, member_id, instance_id | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id > $PaginationMemberId | ||
ORDER BY member_id | ||
LIMIT $Limit; | ||
)sql"; | ||
|
||
const TString CHECK_GROUP_STATE = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
|
||
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type, last_success_generation | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup; | ||
)sql"; | ||
|
||
const TString FETCH_ASSIGNMENTS = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
|
||
SELECT assignment | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id = $MemberId; | ||
)sql"; | ||
|
||
const TString CHECK_DEAD_MEMBERS = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $Now AS Datetime; | ||
|
||
SELECT COUNT(1) deads_cnt | ||
FROM `%s` | ||
VIEW idx_group_generation_db_hb | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND heartbeat_deadline < $Now; | ||
|
||
SELECT session_timeout_ms | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id = $MemberId; | ||
|
||
)sql"; | ||
|
||
const TString UPDATE_LAST_MEMBER_AND_GROUP_HEARTBEATS = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $LastMasterHeartbeat AS Datetime; | ||
DECLARE $HeartbeatDeadline AS Datetime; | ||
DECLARE $UpdateGroupHeartbeat AS Bool; | ||
|
||
UPDATE `%s` | ||
SET last_heartbeat_time = $LastMasterHeartbeat | ||
WHERE consumer_group = $ConsumerGroup | ||
AND database = $Database | ||
AND generation = $Generation | ||
AND $UpdateGroupHeartbeat = True; | ||
|
||
UPDATE `%s` | ||
SET heartbeat_deadline = $HeartbeatDeadline | ||
WHERE consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id = $MemberId | ||
AND database = $Database; | ||
)sql"; | ||
|
||
const TString UPDATE_LAST_MEMBER_HEARTBEAT = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $HeartbeatDeadline AS Datetime; | ||
|
||
UPDATE `%s` | ||
SET heartbeat_deadline = $HeartbeatDeadline | ||
WHERE consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id = $MemberId | ||
AND database = $Database; | ||
)sql"; | ||
|
||
const TString CHECK_MASTER_ALIVE = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $MasterId AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $Now AS Datetime; | ||
|
||
SELECT COUNT(1) allive, | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id = $MasterId | ||
AND heartbeat_deadline > $Now; | ||
)sql"; | ||
|
||
const TString GET_GENERATION_BY_MEMBER = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
|
||
SELECT generation | ||
FROM `%s` | ||
VIEW PRIMARY KEY | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND member_id = $MemberId | ||
ORDER BY generation DESC | ||
LIMIT 1; | ||
)sql"; | ||
|
||
const TString UPDATE_LAST_HEARTBEAT_AND_STATE_TO_LEAVE_GROUP = R"sql( | ||
--!syntax_v1 | ||
DECLARE $ConsumerGroup AS Utf8; | ||
DECLARE $MemberId AS Utf8; | ||
DECLARE $Database AS Utf8; | ||
DECLARE $Generation AS Uint64; | ||
DECLARE $LastMasterHeartbeat AS Datetime; | ||
DECLARE $State AS Uint64; | ||
DECLARE $UpdateState AS Bool; | ||
|
||
UPDATE `%s` | ||
SET heartbeat_deadline = $LastMasterHeartbeat, | ||
leaved = True | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND generation = $Generation | ||
AND member_id = $MemberId; | ||
|
||
UPDATE `%s` | ||
SET | ||
state = $State | ||
WHERE database = $Database | ||
AND consumer_group = $ConsumerGroup | ||
AND $UpdateState = True; | ||
)sql"; | ||
|
||
const TString CHECK_GROUPS_COUNT = R"sql( | ||
--!syntax_v1 | ||
DECLARE $GroupsCountCheckDeadline AS Datetime; | ||
|
||
SELECT COUNT(1) as groups_count | ||
FROM `%s` | ||
niksaveliev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
VIEW idx_last_hb | ||
WHERE last_heartbeat_time > $GroupsCountCheckDeadline; | ||
)sql"; | ||
|
||
} // namespace NKafka |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Почему в этом запросе нет предиката
generation = $Generation
?