Skip to content

Commit

Permalink
feat(connect): search on connect list (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
PLarboulette authored Mar 9, 2021
1 parent c084bb2 commit a5efab1
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 33 deletions.
84 changes: 71 additions & 13 deletions client/src/containers/Connect/ConnectList/ConnectList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import 'ace-builds/src-noconflict/theme-merbivore_soft';
import { toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import Root from "../../../components/Root";
import SearchBar from "../../../components/SearchBar";
import Pagination from "../../../components/Pagination";

class ConnectList extends Root {
state = {
Expand All @@ -22,7 +24,13 @@ class ConnectList extends Root {
definitionToDelete: '',
deleteMessage: '',
roles: JSON.parse(sessionStorage.getItem('roles')),
loading: true
loading: true,
pageNumber: 1,
totalPageNumber: 1,
history: this.props,
searchData: {
search: ''
},
};

static getDerivedStateFromProps(nextProps, prevState) {
Expand All @@ -36,24 +44,26 @@ class ConnectList extends Root {
}

componentDidMount() {
this.getConnectDefinitions();
}

componentDidUpdate(prevProps, prevState) {
if (this.props.location.pathname !== prevProps.location.pathname) {
const { searchData } = this.state;
const query = new URLSearchParams(this.props.location.search);
this.setState({ searchData: { search: (query.get('search'))? query.get('search') : searchData.search }}, () => {
this.getConnectDefinitions();
}
});
}

async getConnectDefinitions() {
let connectDefinitions = [];
const { clusterId, connectId } = this.state;
const { clusterId, connectId, pageNumber } = this.state;
const { search } = this.state.searchData;

this.setState({ loading: true });

connectDefinitions = await this.getApi(uriConnectDefinitions(clusterId, connectId));
this.handleData(connectDefinitions.data);
this.setState({ selectedCluster: clusterId });
let response = await this.getApi(uriConnectDefinitions(clusterId, connectId, search, pageNumber));
if (response.data.results) {
this.handleData(response.data.results);
this.setState({ clusterId, totalPageNumber: response.data.page });
} else {
this.setState({ clusterId, tableData: [], totalPageNumber: 0, loading: false });
}
}

deleteDefinition = () => {
Expand Down Expand Up @@ -121,6 +131,34 @@ class ConnectList extends Root {
});
}

handlePageChange = ({ currentTarget: input }) => {
const { value } = input;
this.setState({ pageNumber: value });
};

handleSearch = data => {
const { searchData } = data;
this.setState({ pageNumber: 1, searchData }, () => {
this.getConnectDefinitions();
this.props.history.push({
pathname: `/ui/${this.state.clusterId}/connect/${this.state.connectId}`,
search: `search=${searchData.search}`
});
});
};

handlePageChangeSubmission = value => {
const { totalPageNumber } = this.state;
if (value <= 0) {
value = 1;
} else if (value > totalPageNumber) {
value = totalPageNumber;
}
this.setState({ pageNumber: value }, () => {
this.getConnectDefinitions();
});
};

renderTasks = tasks => {
let renderedTasks = [];

Expand Down Expand Up @@ -153,13 +191,33 @@ class ConnectList extends Root {
};

render() {
const { clusterId, connectId, tableData, loading } = this.state;
const { clusterId, connectId, tableData, loading, searchData, pageNumber, totalPageNumber } = this.state;
const roles = this.state.roles || {};
const { history } = this.props;

return (
<div>
<Header title={`Connect: ${connectId}`} history={history} />
<nav
className="navbar navbar-expand-lg navbar-light bg-light mr-auto
khq-data-filter khq-sticky khq-nav"
>
<SearchBar
showSearch={true}
search={searchData.search}
showPagination={true}
pagination={pageNumber}
doSubmit={this.handleSearch}
/>

<Pagination
pageNumber={pageNumber}
totalPageNumber={totalPageNumber}
onChange={this.handlePageChange}
onSubmit={this.handlePageChangeSubmission}
/>
</nav>

<Table
loading={loading}
history={history}
Expand Down
4 changes: 2 additions & 2 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ export const uriConnects = id => {
return `${apiUrl}/connects${id ? '?clusterId=' + id : ''}`;
};

export const uriConnectDefinitions = (clusterId, connectId) => {
return `${apiUrl}/${clusterId}/connect/${connectId}`;
export const uriConnectDefinitions = (clusterId, connectId, search, pageNumber) => {
return `${apiUrl}/${clusterId}/connect/${connectId}?&search=${search}&page=${pageNumber}`;
};

export const uriConnectPlugins = (clusterId, connectId) => {
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/org/akhq/controllers/ConnectController.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.akhq.controllers;

import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Controller;
Expand All @@ -12,25 +14,42 @@
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.repositories.ConnectRepository;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
import org.akhq.utils.ResultPagedList;
import org.codehaus.httpcache4j.uri.URIBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.inject.Inject;

@Secured(Role.ROLE_CONNECT_READ)
@Controller("/api/{cluster}/connect/{connectId}")
public class ConnectController extends AbstractController {
private final ConnectRepository connectRepository;

// I used the same configuration as for the registry schema
@Value("${akhq.pagination.page-size}")
private Integer pageSize;

@Inject
public ConnectController(ConnectRepository connectRepository) {
this.connectRepository = connectRepository;
}

@Get
@Operation(tags = {"connect"}, summary = "List all connect definitions")
public List<ConnectDefinition> list(String cluster, String connectId) {
return this.connectRepository.getDefinitions(cluster, connectId);
public ResultPagedList<ConnectDefinition> list(
HttpRequest<?> request, String cluster, String connectId, Optional<String> search, Optional<Integer> page)
throws IOException, RestClientException, ExecutionException, InterruptedException
{
URIBuilder uri = URIBuilder.fromURI(request.getUri());
Pagination pagination = new Pagination(pageSize, uri, page.orElse(1));

return ResultPagedList.of(this.connectRepository.getPaginatedDefinitions(cluster, connectId, pagination, search));
}

@Get("/plugins")
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.akhq.models.Record;

@Slf4j
@Secured(Role.ROLE_TOPIC_READ)
Expand Down
30 changes: 26 additions & 4 deletions src/main/java/org/akhq/repositories/ConnectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.ApplicationContext;
import io.micronaut.retry.annotation.Retryable;
import io.micronaut.security.authentication.Authentication;
Expand All @@ -13,6 +14,8 @@
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.PagedList;
import org.akhq.utils.Pagination;
import org.akhq.utils.UserGroupUtils;
import org.sourcelab.kafka.connect.apiclient.request.dto.*;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ConcurrentConfigModificationException;
Expand All @@ -21,7 +24,9 @@

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -62,17 +67,34 @@ public ConnectDefinition getDefinition(String clusterId, String connectId, Strin
}

@Retryable(includes = {
ConcurrentConfigModificationException.class,
ResourceNotFoundException.class
ConcurrentConfigModificationException.class,
ResourceNotFoundException.class
}, delay = "3s", attempts = "5")
public List<ConnectDefinition> getDefinitions(String clusterId, String connectId) {
public PagedList<ConnectDefinition> getPaginatedDefinitions (String clusterId, String connectId, Pagination pagination, Optional<String> search)
throws IOException, RestClientException, ExecutionException, InterruptedException{
List<ConnectDefinition> definitions = getDefinitions(clusterId, connectId, search);

// I'm not sure of how to use the last parameter in this case
// I look at the implementation for the Schema Registry part, but I don't see how make a similar thing here
return PagedList.of(definitions, pagination, list -> list);
}

public List<ConnectDefinition> getDefinitions(String clusterId, String connectId, Optional<String> search
)
{
ConnectorsWithExpandedMetadata unfiltered = this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.getConnectorsWithAllExpandedMetadata();

Collection<ConnectorDefinition> definitions = unfiltered.getAllDefinitions();

Collection<ConnectorDefinition> connectorsFilteredBySearch = search.map(
query -> definitions.stream().filter(connector -> connector.getName().contains(query))
).orElse(definitions.stream()).collect(Collectors.toList());

ArrayList<ConnectDefinition> filtered = new ArrayList<>();
for (ConnectorDefinition item : unfiltered.getAllDefinitions()) {
for (ConnectorDefinition item : connectorsFilteredBySearch) {
if (isMatchRegex(getConnectFilterRegex(), item.getName())) {
filtered.add(new ConnectDefinition(
item,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.akhq.utils.ResultPagedList;
import org.apache.kafka.common.config.TopicConfig;
import org.junit.jupiter.api.*;

import org.akhq.models.Record;
import java.util.Base64;
import java.util.List;
import java.util.Random;
Expand Down
65 changes: 54 additions & 11 deletions src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.akhq.KafkaTestCluster;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.utils.Pagination;
import org.codehaus.httpcache4j.uri.URIBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import scala.None;

import javax.inject.Inject;
import java.util.*;
Expand Down Expand Up @@ -98,10 +101,10 @@ public void create() {
);


List<ConnectDefinition> all1 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1");
List<ConnectDefinition> all1 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty());
assertEquals(1, all1.size());

List<ConnectDefinition> all2 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2");
List<ConnectDefinition> all2 = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2", Optional.empty());
assertEquals(1, all2.size());

assertEquals(path1, repository.getDefinition(
Expand Down Expand Up @@ -152,8 +155,16 @@ public void create() {

repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","ConnectRepositoryTest1");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-2","ConnectRepositoryTest2");
assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1").size());
assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2").size());
assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty()).size());
assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2", Optional.empty()).size());
}

private void mockApplicationContext() {
Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("connectsFilterRegexp", new ArrayList<>(Arrays.asList("^prefixed.*$"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
}

@Test
Expand Down Expand Up @@ -194,18 +205,50 @@ public void getFilteredList() {

mockApplicationContext();

List<ConnectDefinition> filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1");
List<ConnectDefinition> filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty());
assertEquals(2, filtered.size());
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching1");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching2");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","not.Matching3");
}
private void mockApplicationContext() {
Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("connectsFilterRegexp", new ArrayList<>(Arrays.asList("^prefixed.*$"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);


@Test
public void getFilteredBySearchList() {

repository.create(
KafkaTestCluster.CLUSTER_ID,
"connect-1",
"prefixed.Matching1",
ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
)
);

repository.create(
KafkaTestCluster.CLUSTER_ID,
"connect-1",
"prefixed.Matching2",
ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
)
);

mockApplicationContext();

List<ConnectDefinition> notFiltered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.empty());
assertEquals(2, notFiltered.size());
List<ConnectDefinition> filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.of("prefixed.Matching1"));
assertEquals(1, filtered.size());
List<ConnectDefinition> filteredAll = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1", Optional.of("prefixed.Matching"));
assertEquals(2, filteredAll.size());

repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching1");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching2");
}

}

0 comments on commit a5efab1

Please sign in to comment.