Skip to content

Commit

Permalink
feat(topic): increase topic partition (#1601)
Browse files Browse the repository at this point in the history
close #294
  • Loading branch information
neeraj-singh47 authored Nov 19, 2023
1 parent d756878 commit 571c198
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 0 deletions.
1 change: 1 addition & 0 deletions client/src/components/Root/Root.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Root extends Component {
buildConfig() {
let config = new Map();
config.cancelToken = this.cancel.token;
config.validateStatus = () => true;

if (localStorage.getItem('jwtToken')) {
config.headers = {};
Expand Down
11 changes: 11 additions & 0 deletions client/src/containers/Topic/Topic/Topic.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,17 @@ class Topic extends Root {
</Link>
)}

{selectedTab === 'partitions' &&
roles.TOPIC_DATA &&
roles.TOPIC_DATA.includes('CREATE') && (
<Link
to={`/ui/${clusterId}/topic/${topicId}/increasepartition`}
className="btn btn-secondary mr-2"
>
<i className="fa fa-plus" aria-hidden={true} /> Increase Partition
</Link>
)}

{roles.TOPIC_DATA && roles.TOPIC_DATA.includes('CREATE') && (
<Link to={`/ui/${clusterId}/topic/${topicId}/produce`} className="btn btn-primary">
<i className="fa fa-plus" aria-hidden={true} /> Produce to topic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import React from 'react';
import Joi from 'joi-browser';
import { withRouter } from 'react-router-dom';
import Form from '../../../../components/Form/Form';
import Header from '../../../Header';
import { uriTopicIncreasePartition } from '../../../../utils/endpoints';
import { toast } from 'react-toastify';

class TopicIncreasePartition extends Form {
state = {
formData: {
partition: 1
},
selectedCluster: this.props.match.params.clusterId,
selectedTopic: this.props.match.params.topicId,
errors: {}
};

componentDidMount() {
this.getTopicsPartitions();
}

async getTopicsPartitions() {
const { selectedCluster, selectedTopic } = this.state;

let partitions = await this.getApi(uriTopicIncreasePartition(selectedCluster, selectedTopic));
let form = {};
form.partition = partitions.data.length;
this.setState({ formData: form });
}

schema = {
partition: Joi.number().min(1).label('Partition').required()
};

async doSubmit() {
const { formData, selectedCluster, selectedTopic } = this.state;
const partitionData = {
partition: formData.partition
};

this.postApi(uriTopicIncreasePartition(selectedCluster, selectedTopic), partitionData)
.then(() => {
this.props.history.push({
pathname: `/ui/${selectedCluster}/topic`
});
toast.success('Topic partition updated');
})
.catch(error => toast.error(error.data.message));
}
render() {
return (
<div>
<form
encType="multipart/form-data"
className="khq-form khq-form-config"
onSubmit={() => this.doSubmit()}
>
<Header title="Increase topic partition" history={this.props.history} />
{this.renderInput('partition', 'Partition', 'Partition', 'number')}
{this.renderButton(
'Update',
() => {
this.doSubmit();
},
undefined,
'button'
)}
</form>
</div>
);
}
}

export default withRouter(TopicIncreasePartition);
9 changes: 9 additions & 0 deletions client/src/utils/Routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ConnectCreate from '../containers/Connect/ConnectCreate/ConnectCreate';
import Connect from '../containers/Connect/ConnectDetail/Connect';
import TopicCreate from '../containers/Topic/TopicCreate/TopicCreate';
import TopicProduce from '../containers/Topic/TopicProduce';
import TopicIncreaseParition from '../containers/Topic/Topic/TopicPartitions/TopicIncreaseParition';
import TopicCopy from '../containers/Topic/TopicCopy';
import Loading from '../containers/Loading';
import ConsumerGroupList from '../containers/ConsumerGroup/ConsumerGroupList';
Expand Down Expand Up @@ -166,6 +167,14 @@ class Routes extends Root {
/>
)}

{roles && roles.TOPIC && roles.TOPIC_DATA.includes('CREATE') && (
<Route
exact
path="/ui/:clusterId/topic/:topicId/increasepartition"
component={TopicIncreaseParition}
/>
)}

{roles && roles.TOPIC && roles.TOPIC_DATA.includes('CREATE') && (
<Route exact path="/ui/:clusterId/topic/:topicId/copy" component={TopicCopy} />
)}
Expand Down
3 changes: 3 additions & 0 deletions client/src/utils/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ export const post = (url, body, config) =>
axios
.post(url, body, { ...configs, ...config })
.then(res => {
if (res.status >= 400) {
reject(res);
}
resolve(res);
})
.catch(err => {
Expand Down
4 changes: 4 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ export const uriTopicsInfo = (clusterId, topicId) => `${apiUrl}/${clusterId}/top

export const uriTopicsCreate = clusterId => `${apiUrl}/${clusterId}/topic`;

export const uriTopicIncreasePartition = (clusterId, topicId) =>
`${apiUrl}/${clusterId}/topic/${topicId}/partitions`;

export const uriTopicsProduce = (clusterId, topicName) =>
`${apiUrl}/${clusterId}/topic/${topicName}/data`;

Expand Down Expand Up @@ -380,5 +383,6 @@ export default {
uriLiveTail,
uriTopicDataSearch,
uriTopicDataDelete,
uriTopicIncreasePartition,
uriDeleteGroupOffsets
};
10 changes: 10 additions & 0 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,16 @@ public List<Config> updateConfig(String cluster, String topicName, Map<String, S
return updated;
}

@AKHQSecured(resource = Role.Resource.TOPIC, action = Role.Action.UPDATE)
@Post(value = "api/{cluster}/topic/{topicName}/partitions")
@Operation(tags = {"topic"}, summary = "Increase partition for a topic")
public HttpResponse<?> increasePartition(String cluster, String topicName, Map<String, Integer> config) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, topicName);
this.topicRepository.increasePartition(cluster, topicName, config.get("partition"));

return HttpResponse.accepted();
}

@AKHQSecured(resource = Role.Resource.TOPIC_DATA, action = Role.Action.DELETE)
@Delete("api/{cluster}/topic/{topicName}/data/empty")
@Operation(tags = {"topic data"}, summary = "Empty data from a topic")
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/akhq/modules/AbstractKafkaWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ public void createTopics(String clusterId, String name, int partitions, short re
listTopics = new HashMap<>();
}

public void alterTopicPartition(String clusterId, String name, int partitions) throws ExecutionException {
Map<String, NewPartitions> newPartitionMap = new HashMap<>();
newPartitionMap.put(name, NewPartitions.increaseTo(partitions));

Logger.call(kafkaModule
.getAdminClient(clusterId)
.createPartitions(newPartitionMap).all(),
"Increase Topic partition",
Collections.singletonList(name)
);
}

public void deleteTopics(String clusterId, String name) throws ExecutionException {
Logger.call(kafkaModule.getAdminClient(clusterId)
.deleteTopics(Collections.singleton(name))
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/akhq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public void delete(String clusterId, String name) throws ExecutionException, Int
kafkaWrapper.deleteTopics(clusterId, name);
}

public void increasePartition(String clusterId, String name, int partitions) throws ExecutionException, InterruptedException {
kafkaWrapper.alterTopicPartition(clusterId, name, partitions);
}

@Retryable(
includes = {
UnknownTopicOrPartitionException.class
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/org/akhq/controllers/TopicControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ void produceMultipleMessages() {
assertTrue(response.get(2).getValue().contains("key3_{\"test_1\":3}"));
}

@Test
@Order(7)
void increasePartitionApi() {
this.exchange(HttpRequest.POST(CREATE_TOPIC_URL + "/partitions",
ImmutableMap.of("partition", 4)));
}

@Test
@Order(8)
void delete() {
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/akhq/repositories/TopicRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ void partition() throws ExecutionException, InterruptedException {
assertEquals(3, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COMPACTED).getPartitions().size());
}

@Test
void increasePartition() throws ExecutionException, InterruptedException {
topicRepository.create(KafkaTestCluster.CLUSTER_ID, "increasePartition", 8, (short) 1, Collections.emptyList()
);
topicRepository.increasePartition(KafkaTestCluster.CLUSTER_ID, "increasePartition", 9);

assertEquals(9, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "increasePartition").getPartitions().size());

topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "increasePartition");
}

private void mockApplicationContext() {
Authentication auth = new ServerAuthentication("test", List.of(), Map.of());
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
Expand Down

0 comments on commit 571c198

Please sign in to comment.