diff --git a/client/src/containers/Connect/ConnectList/ConnectList.jsx b/client/src/containers/Connect/ConnectList/ConnectList.jsx
index ef1e882a5..c24321176 100644
--- a/client/src/containers/Connect/ConnectList/ConnectList.jsx
+++ b/client/src/containers/Connect/ConnectList/ConnectList.jsx
@@ -12,6 +12,8 @@ import 'ace-builds/src-noconflict/theme-merbivore_soft';
import { toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import Root from "../../../components/Root";
+import SearchBar from "../../../components/SearchBar";
+import Pagination from "../../../components/Pagination";
class ConnectList extends Root {
state = {
@@ -22,7 +24,13 @@ class ConnectList extends Root {
definitionToDelete: '',
deleteMessage: '',
roles: JSON.parse(sessionStorage.getItem('roles')),
- loading: true
+ loading: true,
+ pageNumber: 1,
+ totalPageNumber: 1,
+ history: this.props,
+ searchData: {
+ search: ''
+ },
};
static getDerivedStateFromProps(nextProps, prevState) {
@@ -36,24 +44,26 @@ class ConnectList extends Root {
}
componentDidMount() {
- this.getConnectDefinitions();
- }
-
- componentDidUpdate(prevProps, prevState) {
- if (this.props.location.pathname !== prevProps.location.pathname) {
+ const { searchData } = this.state;
+ const query = new URLSearchParams(this.props.location.search);
+ this.setState({ searchData: { search: (query.get('search'))? query.get('search') : searchData.search }}, () => {
this.getConnectDefinitions();
- }
+ });
}
async getConnectDefinitions() {
- let connectDefinitions = [];
- const { clusterId, connectId } = this.state;
+ const { clusterId, connectId, pageNumber } = this.state;
+ const { search } = this.state.searchData;
this.setState({ loading: true });
- connectDefinitions = await this.getApi(uriConnectDefinitions(clusterId, connectId));
- this.handleData(connectDefinitions.data);
- this.setState({ selectedCluster: clusterId });
+ let response = await this.getApi(uriConnectDefinitions(clusterId, connectId, search, pageNumber));
+ if (response.data.results) {
+ this.handleData(response.data.results);
+ this.setState({ clusterId, totalPageNumber: response.data.page });
+ } else {
+ this.setState({ clusterId, tableData: [], totalPageNumber: 0, loading: false });
+ }
}
deleteDefinition = () => {
@@ -121,6 +131,34 @@ class ConnectList extends Root {
});
}
+ handlePageChange = ({ currentTarget: input }) => {
+ const { value } = input;
+ this.setState({ pageNumber: value });
+ };
+
+ handleSearch = data => {
+ const { searchData } = data;
+ this.setState({ pageNumber: 1, searchData }, () => {
+ this.getConnectDefinitions();
+ this.props.history.push({
+ pathname: `/ui/${this.state.clusterId}/connect/${this.state.connectId}`,
+ search: `search=${searchData.search}`
+ });
+ });
+ };
+
+ handlePageChangeSubmission = value => {
+ const { totalPageNumber } = this.state;
+ if (value <= 0) {
+ value = 1;
+ } else if (value > totalPageNumber) {
+ value = totalPageNumber;
+ }
+ this.setState({ pageNumber: value }, () => {
+ this.getConnectDefinitions();
+ });
+ };
+
renderTasks = tasks => {
let renderedTasks = [];
@@ -153,13 +191,33 @@ class ConnectList extends Root {
};
render() {
- const { clusterId, connectId, tableData, loading } = this.state;
+ const { clusterId, connectId, tableData, loading, searchData, pageNumber, totalPageNumber } = this.state;
const roles = this.state.roles || {};
const { history } = this.props;
return (
+
+
{
return `${apiUrl}/connects${id ? '?clusterId=' + id : ''}`;
};
-export const uriConnectDefinitions = (clusterId, connectId) => {
- return `${apiUrl}/${clusterId}/connect/${connectId}`;
+export const uriConnectDefinitions = (clusterId, connectId, search, pageNumber) => {
+ return `${apiUrl}/${clusterId}/connect/${connectId}?&search=${search}&page=${pageNumber}`;
};
export const uriConnectPlugins = (clusterId, connectId) => {
diff --git a/src/main/java/org/akhq/controllers/ConnectController.java b/src/main/java/org/akhq/controllers/ConnectController.java
index ae3576c77..08e0f7ffc 100644
--- a/src/main/java/org/akhq/controllers/ConnectController.java
+++ b/src/main/java/org/akhq/controllers/ConnectController.java
@@ -1,5 +1,7 @@
package org.akhq.controllers;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.micronaut.context.annotation.Value;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Controller;
@@ -12,9 +14,16 @@
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.repositories.ConnectRepository;
+import org.akhq.utils.PagedList;
+import org.akhq.utils.Pagination;
+import org.akhq.utils.ResultPagedList;
+import org.codehaus.httpcache4j.uri.URIBuilder;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import javax.inject.Inject;
@Secured(Role.ROLE_CONNECT_READ)
@@ -22,6 +31,10 @@
public class ConnectController extends AbstractController {
private final ConnectRepository connectRepository;
+ // I used the same configuration as for the registry schema
+ @Value("${akhq.pagination.page-size}")
+ private Integer pageSize;
+
@Inject
public ConnectController(ConnectRepository connectRepository) {
this.connectRepository = connectRepository;
@@ -29,8 +42,14 @@ public ConnectController(ConnectRepository connectRepository) {
@Get
@Operation(tags = {"connect"}, summary = "List all connect definitions")
- public List list(String cluster, String connectId) {
- return this.connectRepository.getDefinitions(cluster, connectId);
+ public ResultPagedList list(
+ HttpRequest> request, String cluster, String connectId, Optional search, Optional page)
+ throws IOException, RestClientException, ExecutionException, InterruptedException
+ {
+ URIBuilder uri = URIBuilder.fromURI(request.getUri());
+ Pagination pagination = new Pagination(pageSize, uri, page.orElse(1));
+
+ return ResultPagedList.of(this.connectRepository.getPaginatedDefinitions(cluster, connectId, pagination, search));
}
@Get("/plugins")
diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java
index 2e3eaf612..29ca03f49 100644
--- a/src/main/java/org/akhq/controllers/TopicController.java
+++ b/src/main/java/org/akhq/controllers/TopicController.java
@@ -34,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Inject;
+import org.akhq.models.Record;
@Slf4j
@Secured(Role.ROLE_TOPIC_READ)
diff --git a/src/main/java/org/akhq/repositories/ConnectRepository.java b/src/main/java/org/akhq/repositories/ConnectRepository.java
index cf7c959d6..2fd5fd7b1 100644
--- a/src/main/java/org/akhq/repositories/ConnectRepository.java
+++ b/src/main/java/org/akhq/repositories/ConnectRepository.java
@@ -5,6 +5,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.retry.annotation.Retryable;
import io.micronaut.security.authentication.Authentication;
@@ -13,6 +14,8 @@
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.modules.KafkaModule;
+import org.akhq.utils.PagedList;
+import org.akhq.utils.Pagination;
import org.akhq.utils.UserGroupUtils;
import org.sourcelab.kafka.connect.apiclient.request.dto.*;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ConcurrentConfigModificationException;
@@ -21,7 +24,9 @@
import javax.inject.Inject;
import javax.inject.Singleton;
+import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.regex.Pattern;
@@ -62,17 +67,34 @@ public ConnectDefinition getDefinition(String clusterId, String connectId, Strin
}
@Retryable(includes = {
- ConcurrentConfigModificationException.class,
- ResourceNotFoundException.class
+ ConcurrentConfigModificationException.class,
+ ResourceNotFoundException.class
}, delay = "3s", attempts = "5")
- public List getDefinitions(String clusterId, String connectId) {
+ public PagedList getPaginatedDefinitions (String clusterId, String connectId, Pagination pagination, Optional search)
+ throws IOException, RestClientException, ExecutionException, InterruptedException{
+ List definitions = getDefinitions(clusterId, connectId, search);
+
+ // I'm not sure of how to use the last parameter in this case
+ // I look at the implementation for the Schema Registry part, but I don't see how make a similar thing here
+ return PagedList.of(definitions, pagination, list -> list);
+ }
+
+ public List getDefinitions(String clusterId, String connectId, Optional search
+ )
+ {
ConnectorsWithExpandedMetadata unfiltered = this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.getConnectorsWithAllExpandedMetadata();
+ Collection definitions = unfiltered.getAllDefinitions();
+
+ Collection connectorsFilteredBySearch = search.map(
+ query -> definitions.stream().filter(connector -> connector.getName().contains(query))
+ ).orElse(definitions.stream()).collect(Collectors.toList());
+
ArrayList filtered = new ArrayList<>();
- for (ConnectorDefinition item : unfiltered.getAllDefinitions()) {
+ for (ConnectorDefinition item : connectorsFilteredBySearch) {
if (isMatchRegex(getConnectFilterRegex(), item.getName())) {
filtered.add(new ConnectDefinition(
item,
diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java
index fc090a03e..fad3712c3 100644
--- a/src/test/java/org/akhq/controllers/TopicControllerTest.java
+++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java
@@ -9,7 +9,7 @@
import org.akhq.utils.ResultPagedList;
import org.apache.kafka.common.config.TopicConfig;
import org.junit.jupiter.api.*;
-
+import org.akhq.models.Record;
import java.util.Base64;
import java.util.List;
import java.util.Random;
diff --git a/src/test/java/org/akhq/repositories/ConnectRepositoryTest.java b/src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
index ca9dd2d88..ee1460ced 100644
--- a/src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
+++ b/src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
@@ -11,6 +11,8 @@
import org.akhq.KafkaTestCluster;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
+import org.akhq.utils.Pagination;
+import org.codehaus.httpcache4j.uri.URIBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -18,6 +20,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import scala.None;
import javax.inject.Inject;
import java.util.*;
@@ -98,10 +101,10 @@ public void create() {
);
- List all1 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1");
+ List all1 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty());
assertEquals(1, all1.size());
- List all2 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2");
+ List all2 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2", Optional.empty());
assertEquals(1, all2.size());
assertEquals(path1, repository.getDefinition(
@@ -152,8 +155,16 @@ public void create() {
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","ConnectRepositoryTest1");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-2","ConnectRepositoryTest2");
- assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1").size());
- assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2").size());
+ assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty()).size());
+ assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2", Optional.empty()).size());
+ }
+
+ private void mockApplicationContext() {
+ Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("connectsFilterRegexp", new ArrayList<>(Arrays.asList("^prefixed.*$"))));
+ DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
+ when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
+ when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
+ when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
}
@Test
@@ -194,18 +205,50 @@ public void getFilteredList() {
mockApplicationContext();
- List filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1");
+ List filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty());
assertEquals(2, filtered.size());
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching1");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching2");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","not.Matching3");
}
- private void mockApplicationContext() {
- Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("connectsFilterRegexp", new ArrayList<>(Arrays.asList("^prefixed.*$"))));
- DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
- when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
- when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
- when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
+
+
+ @Test
+ public void getFilteredBySearchList() {
+
+ repository.create(
+ KafkaTestCluster.CLUSTER_ID,
+ "connect-1",
+ "prefixed.Matching1",
+ ImmutableMap.of(
+ "connector.class", "FileStreamSinkConnector",
+ "file", "/tmp/test.txt",
+ "topics", KafkaTestCluster.TOPIC_CONNECT
+ )
+ );
+
+ repository.create(
+ KafkaTestCluster.CLUSTER_ID,
+ "connect-1",
+ "prefixed.Matching2",
+ ImmutableMap.of(
+ "connector.class", "FileStreamSinkConnector",
+ "file", "/tmp/test.txt",
+ "topics", KafkaTestCluster.TOPIC_CONNECT
+ )
+ );
+
+ mockApplicationContext();
+
+ List notFiltered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty());
+ assertEquals(2, notFiltered.size());
+ List filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.of("prefixed.Matching1"));
+ assertEquals(1, filtered.size());
+ List filteredAll = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.of("prefixed.Matching"));
+ assertEquals(2, filteredAll.size());
+
+ repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching1");
+ repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching2");
}
}