From 138ce90a0f4ee059d79260a595a9d6ea1d710c85 Mon Sep 17 00:00:00 2001 From: twobeeb Date: Fri, 26 Feb 2021 17:07:02 +0100 Subject: [PATCH] End of day --- settings.gradle | 2 +- .../gitlab/GitlabTokenValidator.java | 23 +++-- .../ns4kafka/controllers/AdminController.java | 89 +++++++++++++++++++ .../controllers/ConnectController.java | 4 +- .../michelin/ns4kafka/models/Connector.java | 1 - .../security/RessourceBasedSecurityRule.java | 15 +--- .../ns4kafka/services/ConnectRestService.java | 42 ++++++--- .../services/KafkaAsyncExecutorScheduler.java | 9 +- .../ns4kafka/validation/ConnectValidator.java | 1 + .../ns4kafka/validation/TopicValidator.java | 14 +++ 10 files changed, 161 insertions(+), 39 deletions(-) diff --git a/settings.gradle b/settings.gradle index b6e1daf8..deef0d0a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name="kafka-ns" +rootProject.name="ns4kafka" diff --git a/src/main/java/com/michelin/ns4kafka/authentication/gitlab/GitlabTokenValidator.java b/src/main/java/com/michelin/ns4kafka/authentication/gitlab/GitlabTokenValidator.java index e1cda7d5..3dd8221b 100644 --- a/src/main/java/com/michelin/ns4kafka/authentication/gitlab/GitlabTokenValidator.java +++ b/src/main/java/com/michelin/ns4kafka/authentication/gitlab/GitlabTokenValidator.java @@ -1,17 +1,14 @@ package com.michelin.ns4kafka.authentication.gitlab; -import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpRequest; -import io.micronaut.http.HttpStatus; -import io.micronaut.http.client.exceptions.HttpClientResponseException; import io.micronaut.security.authentication.Authentication; import io.micronaut.security.authentication.AuthenticationException; import io.micronaut.security.authentication.DefaultAuthentication; import io.micronaut.security.token.validator.TokenValidator; import io.reactivex.Flowable; import io.reactivex.Maybe; -import io.reactivex.Single; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,15 +18,18 @@ import javax.inject.Singleton; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; @Singleton public class GitlabTokenValidator implements TokenValidator { private static final Logger LOG = LoggerFactory.getLogger(GitlabTokenValidator.class); + public static final String IS_ADMIN = "isAdmin()"; @Inject GitlabAuthorizationService gitlabAuthorizationService; + @Value("${ns4kafka.admin.group:_}") + private String adminGroup; + @Deprecated @Override public Publisher validateToken(String token) { @@ -47,12 +47,23 @@ public Publisher validateToken(String token, @Nullable HttpReque .flatMapPublisher(username -> gitlabAuthorizationService .findAllGroups(token) .toList() - .map(groups -> new DefaultAuthentication(username, Map.of("roles", groups, "email", username))) + .map(groups -> new DefaultAuthentication(username, Map.of( + "groups", groups, + "email", username, + "roles", computeRolesFromProperties(groups) + ))) .toFlowable() ) : Flowable.empty(); } + private List computeRolesFromProperties(List groups) { + if(groups.contains(adminGroup)) + return List.of(IS_ADMIN); + //TODO other specific API groups ? auditor ? + return List.of(); + } + @Override public int getOrder() { return 0; diff --git a/src/main/java/com/michelin/ns4kafka/controllers/AdminController.java b/src/main/java/com/michelin/ns4kafka/controllers/AdminController.java index 18df7ffd..70f89e7f 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/AdminController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/AdminController.java @@ -1,8 +1,97 @@ package com.michelin.ns4kafka.controllers; +import com.michelin.ns4kafka.models.AccessControlEntry; +import com.michelin.ns4kafka.models.Namespace; +import com.michelin.ns4kafka.repositories.AccessControlEntryRepository; +import com.michelin.ns4kafka.repositories.NamespaceRepository; +import com.michelin.ns4kafka.services.KafkaAsyncExecutorConfig; +import com.michelin.ns4kafka.validation.ResourceValidationException; +import io.micronaut.core.annotation.Introspected; +import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Post; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import javax.annotation.security.RolesAllowed; +import javax.inject.Inject; +import javax.validation.Valid; +import javax.validation.constraints.NotBlank; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; @Controller("/api/admin") public class AdminController { + @Inject + NamespaceRepository namespaceRepository; + @Inject + AccessControlEntryRepository accessControlEntryRepository; + @Inject + List kafkaAsyncExecutorConfigList; + + @RolesAllowed("isAdmin()") + @Post("/namespace") + public Namespace createNamespace(@Valid @Body NamespaceCreationRequest namespaceCreationRequest){ + + // Validation steps: + // - namespace must not already exist + // - cluster must exist + // - kafkaUser must not exist within the namespaces linked to this cluster + // - prefix ? prefix overlap ? "seb" currently exists and we try to create "se" or "seb_a" + // current new check + // seb seb_a new.startswith(current) + // seb se current.startswith(new) + List validationErrors = new ArrayList<>(); + if(namespaceRepository.findByName(namespaceCreationRequest.getName()).isPresent()) { + validationErrors.add("Namespace already exist"); + } + + if(kafkaAsyncExecutorConfigList.stream() + .noneMatch(config -> config.getName().equals(namespaceCreationRequest.getCluster()))) { + validationErrors.add("Cluster doesn't exist"); + } + if(namespaceRepository.findAllForCluster(namespaceCreationRequest.getCluster()).stream() + .anyMatch(namespace -> namespace.getDefaulKafkatUser().equals(namespaceCreationRequest.getKafkaUser()))){ + validationErrors.add("KafkaUser already exist"); + } + List prefixInUse = accessControlEntryRepository.findAllForCluster(namespaceCreationRequest.getCluster()).stream() + .filter(ace -> ace.getSpec().getResourcePatternType() == AccessControlEntry.ResourcePatternType.PREFIXED) + .filter(ace -> ace.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC) + .filter(ace -> ace.getSpec().getResource().startsWith(namespaceCreationRequest.getPrefix()) + || namespaceCreationRequest.getPrefix().startsWith(ace.getSpec().getResource())) + .collect(Collectors.toList()); + if(prefixInUse.size()>0) { + validationErrors.add(String.format("Prefix overlaps with namespace %s: [%s]" + , prefixInUse.get(0).getSpec().getGrantedTo() + , prefixInUse.get(0).getSpec().getResource())); + } + + + if(validationErrors.size()>0){ + throw new ResourceValidationException(validationErrors); + } + //TODO this + Namespace toCreate = Namespace.builder().build(); + return toCreate; + } + + + @Introspected + @Getter + @Setter + @NoArgsConstructor + public static class NamespaceCreationRequest{ + @NotBlank + String name; + @NotBlank + String cluster; + @NotBlank + String kafkaUser; + @NotBlank + String prefix; + + } } diff --git a/src/main/java/com/michelin/ns4kafka/controllers/ConnectController.java b/src/main/java/com/michelin/ns4kafka/controllers/ConnectController.java index d78f8e27..739ad6a3 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/ConnectController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/ConnectController.java @@ -9,6 +9,7 @@ import com.michelin.ns4kafka.validation.ResourceValidationException; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; import io.micronaut.http.annotation.Error; import io.micronaut.http.annotation.*; import io.micronaut.http.hateoas.JsonError; @@ -52,12 +53,13 @@ public Maybe getConnector(String namespace, String connector){ .firstElement(); } + @Status(HttpStatus.NO_CONTENT) @Delete("/{connector}") public Flowable> deleteConnector(String namespace, String connector){ if(isNamespaceOwnerOfConnect(namespace,connector)) { return connectRepository.delete(namespace,connector) .onErrorResumeNext((Function>>) throwable ->{ - //TODO better error handling plz + //TODO better error handling plz, handle 404 return Flowable.error(new ConnectCreationException(throwable)); } ); }else { diff --git a/src/main/java/com/michelin/ns4kafka/models/Connector.java b/src/main/java/com/michelin/ns4kafka/models/Connector.java index 0a9665a3..c7418adc 100644 --- a/src/main/java/com/michelin/ns4kafka/models/Connector.java +++ b/src/main/java/com/michelin/ns4kafka/models/Connector.java @@ -7,7 +7,6 @@ import javax.validation.Valid; import javax.validation.constraints.NotNull; -import java.time.Instant; import java.util.Date; import java.util.List; import java.util.Map; diff --git a/src/main/java/com/michelin/ns4kafka/security/RessourceBasedSecurityRule.java b/src/main/java/com/michelin/ns4kafka/security/RessourceBasedSecurityRule.java index 38ca9b3c..eb4a9b23 100644 --- a/src/main/java/com/michelin/ns4kafka/security/RessourceBasedSecurityRule.java +++ b/src/main/java/com/michelin/ns4kafka/security/RessourceBasedSecurityRule.java @@ -1,10 +1,8 @@ package com.michelin.ns4kafka.security; -import com.michelin.ns4kafka.models.Namespace; import com.michelin.ns4kafka.models.RoleBinding; import com.michelin.ns4kafka.repositories.NamespaceRepository; import com.michelin.ns4kafka.repositories.RoleBindingRepository; -import io.micronaut.context.annotation.Value; import io.micronaut.http.HttpRequest; import io.micronaut.security.rules.SecurityRule; import io.micronaut.security.rules.SecurityRuleResult; @@ -30,27 +28,20 @@ public class RessourceBasedSecurityRule implements SecurityRule { @Inject NamespaceRepository namespaceRepository; - @Value("${ns4kafka.admin.group:_}") - private String adminGroup; - Pattern namespacedResourcePattern = Pattern.compile("^\\/api\\/namespaces\\/(?[a-zA-Z0-9_-]+)\\/(?[a-z]+)(\\/([a-zA-Z0-9_-]+)(\\/(?[a-z]+))?)?"); @Override public SecurityRuleResult check(HttpRequest request, @Nullable RouteMatch routeMatch, @Nullable Map claims) { //If the request corresponds to a Controller entry - if(routeMatch != null && claims != null && claims.containsKey("roles") && claims.containsKey("email")){ + if(routeMatch != null && claims != null && claims.containsKey("groups") && claims.containsKey("email")){ LOG.info("API call from "+claims.get("email")+ " on resource "+routeMatch.toString()); - List roles = (List)claims.get("roles"); + List groups = (List)claims.get("groups"); - if (roles.contains(adminGroup)) { - LOG.debug("Authorized user "+claims.get("email")+" : Member of Admin Group ["+adminGroup+"]"); - return SecurityRuleResult.ALLOWED; - } // Not using routeMatch to get the resourceType and resourceSubtype values Matcher matcher = namespacedResourcePattern.matcher(request.getPath()); while (matcher.find()){ //namespaced resource handling - Collection roleBindings = roleBindingRepository.findAllForGroups(roles); + Collection roleBindings = roleBindingRepository.findAllForGroups(groups); //TODO users + groups // roleBindings.addAll(roleBindingRepository.findAllForUser(request.getUserPrincipal().get().getName())) diff --git a/src/main/java/com/michelin/ns4kafka/services/ConnectRestService.java b/src/main/java/com/michelin/ns4kafka/services/ConnectRestService.java index e8247b3e..fc3346e3 100644 --- a/src/main/java/com/michelin/ns4kafka/services/ConnectRestService.java +++ b/src/main/java/com/michelin/ns4kafka/services/ConnectRestService.java @@ -4,8 +4,10 @@ import com.michelin.ns4kafka.models.Connector; import io.micronaut.context.annotation.EachBean; import io.micronaut.core.type.Argument; +import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; +import io.micronaut.http.MutableHttpRequest; import io.micronaut.http.client.RxHttpClient; import io.micronaut.http.client.exceptions.HttpClientResponseException; import io.micronaut.retry.annotation.DefaultRetryPredicate; @@ -30,11 +32,13 @@ public class ConnectRestService { RxHttpClient httpClient; + KafkaAsyncExecutorConfig.ConnectConfig connectConfig; public ConnectRestService(KafkaAsyncExecutorConfig kafkaAsyncExecutorConfig) { try { if(kafkaAsyncExecutorConfig.getConnect()!=null) { - httpClient = RxHttpClient.create(new URL(kafkaAsyncExecutorConfig.getConnect().getUrl())); + this.connectConfig = kafkaAsyncExecutorConfig.getConnect(); + this.httpClient = RxHttpClient.create(new URL(kafkaAsyncExecutorConfig.getConnect().getUrl())); } }catch (MalformedURLException e){ @@ -45,34 +49,50 @@ public void cleanup(){ httpClient.close(); } + public Flowable retrieve(MutableHttpRequest request, Argument bodyType) { + if(StringUtils.isNotEmpty(connectConfig.getBasicAuthUsername())){ + request = request.basicAuth(connectConfig.getBasicAuthUsername(),connectConfig.getBasicAuthPassword()); + } + return httpClient.retrieve(request, bodyType) + .subscribeOn(Schedulers.io()); + } + public Flowable> exchange(MutableHttpRequest request, Argument bodyType) { + if(StringUtils.isNotEmpty(connectConfig.getBasicAuthUsername())){ + request = request.basicAuth(connectConfig.getBasicAuthUsername(),connectConfig.getBasicAuthPassword()); + } + return httpClient.exchange(request, bodyType) + .subscribeOn(Schedulers.io()); + } + //TODO // Caching (10-20seconds or something) // https://guides.micronaut.io/micronaut-cache/guide/index.html public Maybe> list() { - return httpClient.retrieve(HttpRequest.GET("connectors?expand=info&expand=status"), + return retrieve(HttpRequest.GET("connectors?expand=info&expand=status"), Argument.mapOf(String.class, ConnectItem.class)) - .firstElement() - .subscribeOn(Schedulers.io()); + .firstElement(); } public Flowable validate(Map spec){ - return httpClient.retrieve( + return retrieve( HttpRequest.PUT("connector-plugins/"+spec.get("connector.class")+"/config/validate", spec), - ConnectValidationResult.class) - .subscribeOn(Schedulers.io()); + Argument.of(ConnectValidationResult.class) + ); } @Retryable(predicate = RebalanceRetryPredicate.class) public Single createOrUpdate(Connector connector){ - return httpClient.retrieve( + return retrieve( HttpRequest.PUT("connectors/"+connector.getMetadata().getName()+"/config",connector.getSpec()), - ConnectInfo.class) - .subscribeOn(Schedulers.io()) + Argument.of(ConnectInfo.class)) .singleOrError(); } public Flowable> delete(String connector){ - return httpClient.exchange(HttpRequest.DELETE("connectors/"+connector), String.class); + return exchange( + HttpRequest.DELETE("connectors/"+connector), + Argument.STRING); + } public static class RebalanceRetryPredicate extends DefaultRetryPredicate{ diff --git a/src/main/java/com/michelin/ns4kafka/services/KafkaAsyncExecutorScheduler.java b/src/main/java/com/michelin/ns4kafka/services/KafkaAsyncExecutorScheduler.java index ce0f8ba0..a3bfe116 100644 --- a/src/main/java/com/michelin/ns4kafka/services/KafkaAsyncExecutorScheduler.java +++ b/src/main/java/com/michelin/ns4kafka/services/KafkaAsyncExecutorScheduler.java @@ -1,14 +1,11 @@ package com.michelin.ns4kafka.services; -import io.micronaut.context.ApplicationContext; -import io.micronaut.inject.qualifiers.Qualifiers; import io.micronaut.scheduling.annotation.Scheduled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; import javax.inject.Singleton; -import java.util.Collection; import java.util.List; @@ -16,16 +13,14 @@ public class KafkaAsyncExecutorScheduler { private static final Logger LOG = LoggerFactory.getLogger(KafkaAsyncExecutorScheduler.class); - @Inject - ApplicationContext applicationContext; - @Inject - List connectRestServices; @Inject List kafkaAsyncExecutors; + //TODO urgent : start the schedulder only when Application is started (ServerStartupEvent) @Scheduled(initialDelay = "12s", fixedDelay = "20s") void schedule(){ + //TODO sequential forEach with exception handling (to let next clusters sync) kafkaAsyncExecutors.forEach(KafkaAsyncExecutor::run); } } diff --git a/src/main/java/com/michelin/ns4kafka/validation/ConnectValidator.java b/src/main/java/com/michelin/ns4kafka/validation/ConnectValidator.java index 271537d2..c3f9b11e 100644 --- a/src/main/java/com/michelin/ns4kafka/validation/ConnectValidator.java +++ b/src/main/java/com/michelin/ns4kafka/validation/ConnectValidator.java @@ -92,6 +92,7 @@ public List validate(Connector connector, String connectorType){ return validationErrors; } + //TODO makeDefault from conf or template namespace ? public static ConnectValidator makeDefault(){ return ConnectValidator.builder() .validationConstraints(Map.of( diff --git a/src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java b/src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java index 65703a4f..b72e14ce 100644 --- a/src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java +++ b/src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java @@ -74,4 +74,18 @@ public List validate(Topic topic, Namespace namespace){ return validationErrors; } + //TODO makeDefault from config or template ? + public static TopicValidator makeDefault(){ + return TopicValidator.builder() + .validationConstraints( + Map.of( "replication.factor", ResourceValidator.Range.between(3,3), + "partitions", ResourceValidator.Range.between(3,6), + "cleanup.policy", ResourceValidator.ValidList.in("delete","compact"), + "min.insync.replicas", ResourceValidator.Range.between(2,2), + "retention.ms", ResourceValidator.Range.between(60000,604800000) + ) + ) + .build(); + } + }