Skip to content

Commit 109df90

Browse files
authored
Merge pull request #11 from devshawn/kafka-reassign-partitions
feat: add kafka-reassign-partitions command (closes #1)
2 parents 15fc7f7 + abf5097 commit 109df90

File tree

7 files changed

+111
-3
lines changed

7 files changed

+111
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ Currently, the following commands are supported:
8181
* `kafka-verifiable-producer`
8282
* `kafka-preferred-replica-election`
8383
* `kafka-replica-verification`
84+
* `kafka-reassign-partitions`
8485
* `kafka-broker-api-versions`
8586
* `kafka-consumer-groups`
8687
* `kafka-delete-records`

kafkashell/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
COMMAND_KAFKA_CONSUMER_GROUPS = "kafka-consumer-groups"
2727
COMMAND_KAFKA_PREFERRED_REPLICA_ELECTION = "kafka-preferred-replica-election"
2828
COMMAND_KAFKA_REPLICA_VERIFICATION = "kafka-replica-verification"
29+
COMMAND_KAFKA_REASSIGN_PARTITIONS = "kafka-reassign-partitions"
2930
COMMAND_KAFKA_BROKER_API_VERSIONS = "kafka-broker-api-versions"
3031
COMMAND_KAFKA_DELETE_RECORDS = "kafka-delete-records"
3132
COMMAND_KAFKA_LOG_DIRS = "kafka-log-dirs"

kafkashell/data/completer.json

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,68 @@
848848
}
849849
}
850850
},
851+
"kafka-reassign-partitions": {
852+
"name": "kafka-reassign-partitions",
853+
"description": "Reassign and move topic partitions between replicas.",
854+
"options": {
855+
"--bootstrap-server": {
856+
"name": "--bootstrap-server",
857+
"description": "The Kafka broker(s) to connect to. NOTE: This'll overwrite the selected cluster value."
858+
},
859+
"--broker-list": {
860+
"name": "--broker-list",
861+
"description": "The list of brokers used in reassignment in the form '0,1,2'. Default: all."
862+
},
863+
"--command-config": {
864+
"name": "--command-config",
865+
"description": "Property file containing configs to be passed to the admin client."
866+
},
867+
"--disable-rack-aware": {
868+
"name": "--disable-rack-aware",
869+
"description": "Disable rack aware replica assignment."
870+
},
871+
"--execute": {
872+
"name": "--execute",
873+
"description": "Start the reassignment as specified by the '--reassignment-json-file' option."
874+
},
875+
"--generate": {
876+
"name": "--generate",
877+
"description": "Generate a candidate partition reassignment configuration. This does not execute it."
878+
},
879+
"--help": {
880+
"name": "--help",
881+
"description": "Print usage information."
882+
},
883+
"--reassignment-json-file": {
884+
"name": "--reassignment-json-file",
885+
"description": "The JSON file with the partition reassignment configuration."
886+
},
887+
"--replica-alter-logs-dirs-throttle": {
888+
"name": "--replica-alter-logs-dirs-throttle",
889+
"description": "The movement of replicas between log directories will be throttled by this value. Default: -1."
890+
},
891+
"--throttle": {
892+
"name": "--throttle",
893+
"description": "The movement of partitions between brokers will be throttled by this value. Default: -1."
894+
},
895+
"--timeout": {
896+
"name": "--timeout",
897+
"description": "The maximum time in ms allowed to wait for execution to start. Default: 10000."
898+
},
899+
"--topics-to-move-json-file": {
900+
"name": "--topics-to-move-json-file",
901+
"description": "Generate a configuration to move partitions of the specified topics to the given list of brokers."
902+
},
903+
"--verify": {
904+
"name": "--verify",
905+
"description": "Verify if the reassignment has completed."
906+
},
907+
"--zookeeper": {
908+
"name": "--zookeeper",
909+
"description": "Zookeeper connection string. WARNING: This'll overwrite the selected cluster value."
910+
}
911+
}
912+
},
851913
"kafka-broker-api-versions": {
852914
"name": "kafka-broker-api-versions",
853915
"description": "Retrieve broker version information.",

kafkashell/executor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def execute_valid_command(self, command):
9393
elif command.startswith(constants.COMMAND_KAFKA_REPLICA_VERIFICATION):
9494
final_command = self.handle_kafka_replica_verification_command(command)
9595

96+
elif command.startswith(constants.COMMAND_KAFKA_REASSIGN_PARTITIONS):
97+
final_command = self.handle_kafka_reassign_partitions_command(command)
98+
9699
elif command.startswith(constants.COMMAND_KAFKA_BROKER_API_VERSIONS):
97100
final_command = self.handle_kafka_broker_api_versions_command(command)
98101

@@ -205,6 +208,12 @@ def handle_kafka_replica_verification_command(self, command):
205208
command += self.handle_broker_list_flag(command)
206209
return command
207210

211+
def handle_kafka_reassign_partitions_command(self, command):
212+
command += self.handle_bootstrap_server_flag(command)
213+
command += self.handle_zookeeper_flag(command)
214+
command += self.handle_admin_client_settings(command)
215+
return command
216+
208217
def handle_kafka_broker_api_versions_command(self, command):
209218
command += self.handle_bootstrap_server_flag(command)
210219
command += self.handle_admin_client_settings(command)

tests/test_completer_data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@
55
"",
66
["version", "cluster-select", "cluster-describe", "exit", "clear", "kafka-acls", "kafka-avro-console-consumer",
77
"kafka-avro-console-producer", "kafka-replica-verification", "kafka-preferred-replica-election",
8-
"kafka-broker-api-versions", "kafka-configs", "kafka-console-consumer", "kafka-console-producer",
8+
"kafka-broker-api-versions", "kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-reassign-partitions",
99
"kafka-consumer-groups", "kafka-delete-records", "kafka-dump-log", "kafka-log-dirs", "kafka-topics",
1010
"kafka-verifiable-consumer", "kafka-verifiable-producer", "ksql", "zookeeper-shell"]
1111
),
1212
(
1313
"kafka",
1414
["kafka-acls", "kafka-avro-console-consumer", "kafka-avro-console-producer", "kafka-broker-api-versions",
15-
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups",
15+
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups", "kafka-reassign-partitions",
1616
"kafka-delete-records", "kafka-dump-log", "kafka-log-dirs", "kafka-topics", "kafka-verifiable-consumer",
1717
"kafka-verifiable-producer", "kafka-replica-verification", "kafka-preferred-replica-election"]
1818
),
1919
(
2020
"k",
2121
["ksql", "kafka-acls", "kafka-avro-console-consumer", "kafka-avro-console-producer",
2222
"kafka-broker-api-versions", "kafka-replica-verification", "kafka-preferred-replica-election",
23-
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups",
23+
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups", "kafka-reassign-partitions",
2424
"kafka-delete-records", "kafka-dump-log", "kafka-log-dirs", "kafka-topics", "kafka-verifiable-consumer",
2525
"kafka-verifiable-producer", "zookeeper-shell"]
2626
),

tests/test_constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ def test_constants():
1010
assert kafkashell.constants.COMMAND_KAFKA_AVRO_CONSOLE_PRODUCER == "kafka-avro-console-producer"
1111
assert kafkashell.constants.COMMAND_KAFKA_VERIFIABLE_CONSUMER == "kafka-verifiable-consumer"
1212
assert kafkashell.constants.COMMAND_KAFKA_VERIFIABLE_PRODUCER == "kafka-verifiable-producer"
13+
assert kafkashell.constants.COMMAND_KAFKA_PREFERRED_REPLICA_ELECTION == "kafka-preferred-replica-election"
14+
assert kafkashell.constants.COMMAND_KAFKA_REPLICA_VERIFICATION == "kafka-replica-verification"
15+
assert kafkashell.constants.COMMAND_KAFKA_REASSIGN_PARTITIONS == "kafka-reassign-partitions"
1316
assert kafkashell.constants.COMMAND_KAFKA_BROKER_API_VERSIONS == "kafka-broker-api-versions"
1417
assert kafkashell.constants.COMMAND_KAFKA_DELETE_RECORDS == "kafka-delete-records"
1518
assert kafkashell.constants.COMMAND_KAFKA_LOG_DIRS == "kafka-log-dirs"

tests/test_executor.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,18 @@
145145
"kafka-replica-verification --broker-list broker1:9092,broker2:9093 --fetch-size 1024",
146146
"kafka-replica-verification --broker-list broker1:9092,broker2:9093 --fetch-size 1024"
147147
),
148+
(
149+
"kafka-reassign-partitions",
150+
"kafka-reassign-partitions --bootstrap-server localhost:9092 --zookeeper localhost:2181"
151+
),
152+
(
153+
"kafka-reassign-partitions --verify",
154+
"kafka-reassign-partitions --verify --bootstrap-server localhost:9092 --zookeeper localhost:2181"
155+
),
156+
(
157+
"kafka-reassign-partitions --command-config test.properties",
158+
"kafka-reassign-partitions --command-config test.properties --bootstrap-server localhost:9092 --zookeeper localhost:2181"
159+
),
148160
(
149161
"kafka-broker-api-versions",
150162
"kafka-broker-api-versions --bootstrap-server localhost:9092"
@@ -543,6 +555,26 @@
543555
"kafka-preferred-replica-election --bootstrap-server test:9092 --admin.config admin.properties",
544556
"test-admin-client-settings"
545557
),
558+
(
559+
"kafka-reassign-partitions",
560+
"kafka-reassign-partitions --bootstrap-server localhost:9092 --zookeeper localhost:2181 --command-config admin.properties",
561+
"test-admin-client-settings"
562+
),
563+
(
564+
"kafka-reassign-partitions --command-config test.properties",
565+
"kafka-reassign-partitions --command-config test.properties --bootstrap-server localhost:9092 --zookeeper localhost:2181",
566+
"test-admin-client-settings"
567+
),
568+
(
569+
"kafka-reassign-partitions --command-config test.properties --bootstrap-server broker1:9092",
570+
"kafka-reassign-partitions --command-config test.properties --bootstrap-server broker1:9092 --zookeeper localhost:2181",
571+
"test-admin-client-settings"
572+
),
573+
(
574+
"kafka-reassign-partitions --zookeeper test:2181",
575+
"kafka-reassign-partitions --zookeeper test:2181 --bootstrap-server localhost:9092 --command-config admin.properties",
576+
"test-admin-client-settings"
577+
)
546578
]
547579

548580
prefix_test_data = [

0 commit comments

Comments
 (0)