Skip to content

feat: add kafka-reassign-partitions command (closes #1) #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Currently, the following commands are supported:
* `kafka-verifiable-producer`
* `kafka-preferred-replica-election`
* `kafka-replica-verification`
* `kafka-reassign-partitions`
* `kafka-broker-api-versions`
* `kafka-consumer-groups`
* `kafka-delete-records`
Expand Down
1 change: 1 addition & 0 deletions kafkashell/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
COMMAND_KAFKA_CONSUMER_GROUPS = "kafka-consumer-groups"
COMMAND_KAFKA_PREFERRED_REPLICA_ELECTION = "kafka-preferred-replica-election"
COMMAND_KAFKA_REPLICA_VERIFICATION = "kafka-replica-verification"
COMMAND_KAFKA_REASSIGN_PARTITIONS = "kafka-reassign-partitions"
COMMAND_KAFKA_BROKER_API_VERSIONS = "kafka-broker-api-versions"
COMMAND_KAFKA_DELETE_RECORDS = "kafka-delete-records"
COMMAND_KAFKA_LOG_DIRS = "kafka-log-dirs"
Expand Down
62 changes: 62 additions & 0 deletions kafkashell/data/completer.json
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,68 @@
}
}
},
"kafka-reassign-partitions": {
"name": "kafka-reassign-partitions",
"description": "Reassign and move topic partitions between replicas.",
"options": {
"--bootstrap-server": {
"name": "--bootstrap-server",
"description": "The Kafka broker(s) to connect to. NOTE: This'll overwrite the selected cluster value."
},
"--broker-list": {
"name": "--broker-list",
"description": "The list of brokers used in reassignment in the form '0,1,2'. Default: all."
},
"--command-config": {
"name": "--command-config",
"description": "Property file containing configs to be passed to the admin client."
},
"--disable-rack-aware": {
"name": "--disable-rack-aware",
"description": "Disable rack aware replica assignment."
},
"--execute": {
"name": "--execute",
"description": "Start the reassignment as specified by the '--reassignment-json-file' option."
},
"--generate": {
"name": "--generate",
"description": "Generate a candidate partition reassignment configuration. This does not execute it."
},
"--help": {
"name": "--help",
"description": "Print usage information."
},
"--reassignment-json-file": {
"name": "--reassignment-json-file",
"description": "The JSON file with the partition reassignment configuration."
},
"--replica-alter-logs-dirs-throttle": {
"name": "--replica-alter-logs-dirs-throttle",
"description": "The movement of replicas between log directories will be throttled by this value. Default: -1."
},
"--throttle": {
"name": "--throttle",
"description": "The movement of partitions between brokers will be throttled by this value. Default: -1."
},
"--timeout": {
"name": "--timeout",
"description": "The maximum time in ms allowed to wait for execution to start. Default: 10000."
},
"--topics-to-move-json-file": {
"name": "--topics-to-move-json-file",
"description": "Generate a configuration to move partitions of the specified topics to the given list of brokers."
},
"--verify": {
"name": "--verify",
"description": "Verify if the reassignment has completed."
},
"--zookeeper": {
"name": "--zookeeper",
"description": "Zookeeper connection string. WARNING: This'll overwrite the selected cluster value."
}
}
},
"kafka-broker-api-versions": {
"name": "kafka-broker-api-versions",
"description": "Retrieve broker version information.",
Expand Down
9 changes: 9 additions & 0 deletions kafkashell/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def execute_valid_command(self, command):
elif command.startswith(constants.COMMAND_KAFKA_REPLICA_VERIFICATION):
final_command = self.handle_kafka_replica_verification_command(command)

elif command.startswith(constants.COMMAND_KAFKA_REASSIGN_PARTITIONS):
final_command = self.handle_kafka_reassign_partitions_command(command)

elif command.startswith(constants.COMMAND_KAFKA_BROKER_API_VERSIONS):
final_command = self.handle_kafka_broker_api_versions_command(command)

Expand Down Expand Up @@ -205,6 +208,12 @@ def handle_kafka_replica_verification_command(self, command):
command += self.handle_broker_list_flag(command)
return command

def handle_kafka_reassign_partitions_command(self, command):
command += self.handle_bootstrap_server_flag(command)
command += self.handle_zookeeper_flag(command)
command += self.handle_admin_client_settings(command)
return command

def handle_kafka_broker_api_versions_command(self, command):
command += self.handle_bootstrap_server_flag(command)
command += self.handle_admin_client_settings(command)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_completer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
"",
["version", "cluster-select", "cluster-describe", "exit", "clear", "kafka-acls", "kafka-avro-console-consumer",
"kafka-avro-console-producer", "kafka-replica-verification", "kafka-preferred-replica-election",
"kafka-broker-api-versions", "kafka-configs", "kafka-console-consumer", "kafka-console-producer",
"kafka-broker-api-versions", "kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-reassign-partitions",
"kafka-consumer-groups", "kafka-delete-records", "kafka-dump-log", "kafka-log-dirs", "kafka-topics",
"kafka-verifiable-consumer", "kafka-verifiable-producer", "ksql", "zookeeper-shell"]
),
(
"kafka",
["kafka-acls", "kafka-avro-console-consumer", "kafka-avro-console-producer", "kafka-broker-api-versions",
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups",
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups", "kafka-reassign-partitions",
"kafka-delete-records", "kafka-dump-log", "kafka-log-dirs", "kafka-topics", "kafka-verifiable-consumer",
"kafka-verifiable-producer", "kafka-replica-verification", "kafka-preferred-replica-election"]
),
(
"k",
["ksql", "kafka-acls", "kafka-avro-console-consumer", "kafka-avro-console-producer",
"kafka-broker-api-versions", "kafka-replica-verification", "kafka-preferred-replica-election",
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups",
"kafka-configs", "kafka-console-consumer", "kafka-console-producer", "kafka-consumer-groups", "kafka-reassign-partitions",
"kafka-delete-records", "kafka-dump-log", "kafka-log-dirs", "kafka-topics", "kafka-verifiable-consumer",
"kafka-verifiable-producer", "zookeeper-shell"]
),
Expand Down
3 changes: 3 additions & 0 deletions tests/test_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ def test_constants():
assert kafkashell.constants.COMMAND_KAFKA_AVRO_CONSOLE_PRODUCER == "kafka-avro-console-producer"
assert kafkashell.constants.COMMAND_KAFKA_VERIFIABLE_CONSUMER == "kafka-verifiable-consumer"
assert kafkashell.constants.COMMAND_KAFKA_VERIFIABLE_PRODUCER == "kafka-verifiable-producer"
assert kafkashell.constants.COMMAND_KAFKA_PREFERRED_REPLICA_ELECTION == "kafka-preferred-replica-election"
assert kafkashell.constants.COMMAND_KAFKA_REPLICA_VERIFICATION == "kafka-replica-verification"
assert kafkashell.constants.COMMAND_KAFKA_REASSIGN_PARTITIONS == "kafka-reassign-partitions"
assert kafkashell.constants.COMMAND_KAFKA_BROKER_API_VERSIONS == "kafka-broker-api-versions"
assert kafkashell.constants.COMMAND_KAFKA_DELETE_RECORDS == "kafka-delete-records"
assert kafkashell.constants.COMMAND_KAFKA_LOG_DIRS == "kafka-log-dirs"
Expand Down
32 changes: 32 additions & 0 deletions tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@
"kafka-replica-verification --broker-list broker1:9092,broker2:9093 --fetch-size 1024",
"kafka-replica-verification --broker-list broker1:9092,broker2:9093 --fetch-size 1024"
),
(
"kafka-reassign-partitions",
"kafka-reassign-partitions --bootstrap-server localhost:9092 --zookeeper localhost:2181"
),
(
"kafka-reassign-partitions --verify",
"kafka-reassign-partitions --verify --bootstrap-server localhost:9092 --zookeeper localhost:2181"
),
(
"kafka-reassign-partitions --command-config test.properties",
"kafka-reassign-partitions --command-config test.properties --bootstrap-server localhost:9092 --zookeeper localhost:2181"
),
(
"kafka-broker-api-versions",
"kafka-broker-api-versions --bootstrap-server localhost:9092"
Expand Down Expand Up @@ -543,6 +555,26 @@
"kafka-preferred-replica-election --bootstrap-server test:9092 --admin.config admin.properties",
"test-admin-client-settings"
),
(
"kafka-reassign-partitions",
"kafka-reassign-partitions --bootstrap-server localhost:9092 --zookeeper localhost:2181 --command-config admin.properties",
"test-admin-client-settings"
),
(
"kafka-reassign-partitions --command-config test.properties",
"kafka-reassign-partitions --command-config test.properties --bootstrap-server localhost:9092 --zookeeper localhost:2181",
"test-admin-client-settings"
),
(
"kafka-reassign-partitions --command-config test.properties --bootstrap-server broker1:9092",
"kafka-reassign-partitions --command-config test.properties --bootstrap-server broker1:9092 --zookeeper localhost:2181",
"test-admin-client-settings"
),
(
"kafka-reassign-partitions --zookeeper test:2181",
"kafka-reassign-partitions --zookeeper test:2181 --bootstrap-server localhost:9092 --command-config admin.properties",
"test-admin-client-settings"
)
]

prefix_test_data = [
Expand Down