Skip to content

Commit

Permalink
End of day
Browse files Browse the repository at this point in the history
  • Loading branch information
twobeeb committed Feb 26, 2021
1 parent 5b094b4 commit 138ce90
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 39 deletions.
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name="kafka-ns"
rootProject.name="ns4kafka"
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package com.michelin.ns4kafka.authentication.gitlab;

import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.authentication.AuthenticationException;
import io.micronaut.security.authentication.DefaultAuthentication;
import io.micronaut.security.token.validator.TokenValidator;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -21,15 +18,18 @@
import javax.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Singleton
public class GitlabTokenValidator implements TokenValidator {
private static final Logger LOG = LoggerFactory.getLogger(GitlabTokenValidator.class);

public static final String IS_ADMIN = "isAdmin()";
@Inject
GitlabAuthorizationService gitlabAuthorizationService;

@Value("${ns4kafka.admin.group:_}")
private String adminGroup;

@Deprecated
@Override
public Publisher<Authentication> validateToken(String token) {
Expand All @@ -47,12 +47,23 @@ public Publisher<Authentication> validateToken(String token, @Nullable HttpReque
.flatMapPublisher(username -> gitlabAuthorizationService
.findAllGroups(token)
.toList()
.map(groups -> new DefaultAuthentication(username, Map.of("roles", groups, "email", username)))
.map(groups -> new DefaultAuthentication(username, Map.of(
"groups", groups,
"email", username,
"roles", computeRolesFromProperties(groups)
)))
.toFlowable()
)
: Flowable.empty();
}

private List<String> computeRolesFromProperties(List<String> groups) {
if(groups.contains(adminGroup))
return List.of(IS_ADMIN);
//TODO other specific API groups ? auditor ?
return List.of();
}

@Override
public int getOrder() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,97 @@
package com.michelin.ns4kafka.controllers;

import com.michelin.ns4kafka.models.AccessControlEntry;
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.repositories.AccessControlEntryRepository;
import com.michelin.ns4kafka.repositories.NamespaceRepository;
import com.michelin.ns4kafka.services.KafkaAsyncExecutorConfig;
import com.michelin.ns4kafka.validation.ResourceValidationException;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Controller("/api/admin")
public class AdminController {

@Inject
NamespaceRepository namespaceRepository;
@Inject
AccessControlEntryRepository accessControlEntryRepository;
@Inject
List<KafkaAsyncExecutorConfig> kafkaAsyncExecutorConfigList;

@RolesAllowed("isAdmin()")
@Post("/namespace")
public Namespace createNamespace(@Valid @Body NamespaceCreationRequest namespaceCreationRequest){

// Validation steps:
// - namespace must not already exist
// - cluster must exist
// - kafkaUser must not exist within the namespaces linked to this cluster
// - prefix ? prefix overlap ? "seb" currently exists and we try to create "se" or "seb_a"
// current new check
// seb seb_a new.startswith(current)
// seb se current.startswith(new)
List<String> validationErrors = new ArrayList<>();
if(namespaceRepository.findByName(namespaceCreationRequest.getName()).isPresent()) {
validationErrors.add("Namespace already exist");
}

if(kafkaAsyncExecutorConfigList.stream()
.noneMatch(config -> config.getName().equals(namespaceCreationRequest.getCluster()))) {
validationErrors.add("Cluster doesn't exist");
}
if(namespaceRepository.findAllForCluster(namespaceCreationRequest.getCluster()).stream()
.anyMatch(namespace -> namespace.getDefaulKafkatUser().equals(namespaceCreationRequest.getKafkaUser()))){
validationErrors.add("KafkaUser already exist");
}
List<AccessControlEntry> prefixInUse = accessControlEntryRepository.findAllForCluster(namespaceCreationRequest.getCluster()).stream()
.filter(ace -> ace.getSpec().getResourcePatternType() == AccessControlEntry.ResourcePatternType.PREFIXED)
.filter(ace -> ace.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC)
.filter(ace -> ace.getSpec().getResource().startsWith(namespaceCreationRequest.getPrefix())
|| namespaceCreationRequest.getPrefix().startsWith(ace.getSpec().getResource()))
.collect(Collectors.toList());
if(prefixInUse.size()>0) {
validationErrors.add(String.format("Prefix overlaps with namespace %s: [%s]"
, prefixInUse.get(0).getSpec().getGrantedTo()
, prefixInUse.get(0).getSpec().getResource()));
}


if(validationErrors.size()>0){
throw new ResourceValidationException(validationErrors);
}
//TODO this
Namespace toCreate = Namespace.builder().build();
return toCreate;
}


@Introspected
@Getter
@Setter
@NoArgsConstructor
public static class NamespaceCreationRequest{
@NotBlank
String name;
@NotBlank
String cluster;
@NotBlank
String kafkaUser;
@NotBlank
String prefix;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.michelin.ns4kafka.validation.ResourceValidationException;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Error;
import io.micronaut.http.annotation.*;
import io.micronaut.http.hateoas.JsonError;
Expand Down Expand Up @@ -52,12 +53,13 @@ public Maybe<Connector> getConnector(String namespace, String connector){
.firstElement();
}

@Status(HttpStatus.NO_CONTENT)
@Delete("/{connector}")
public Flowable<HttpResponse<String>> deleteConnector(String namespace, String connector){
if(isNamespaceOwnerOfConnect(namespace,connector)) {
return connectRepository.delete(namespace,connector)
.onErrorResumeNext((Function<? super Throwable, ? extends Publisher<? extends HttpResponse<String>>>) throwable ->{
//TODO better error handling plz
//TODO better error handling plz, handle 404
return Flowable.error(new ConnectCreationException(throwable));
} );
}else {
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/michelin/ns4kafka/models/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.michelin.ns4kafka.security;

import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.models.RoleBinding;
import com.michelin.ns4kafka.repositories.NamespaceRepository;
import com.michelin.ns4kafka.repositories.RoleBindingRepository;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.HttpRequest;
import io.micronaut.security.rules.SecurityRule;
import io.micronaut.security.rules.SecurityRuleResult;
Expand All @@ -30,27 +28,20 @@ public class RessourceBasedSecurityRule implements SecurityRule {
@Inject
NamespaceRepository namespaceRepository;

@Value("${ns4kafka.admin.group:_}")
private String adminGroup;

Pattern namespacedResourcePattern = Pattern.compile("^\\/api\\/namespaces\\/(?<namespace>[a-zA-Z0-9_-]+)\\/(?<resourceType>[a-z]+)(\\/([a-zA-Z0-9_-]+)(\\/(?<resourceSubtype>[a-z]+))?)?");

@Override
public SecurityRuleResult check(HttpRequest<?> request, @Nullable RouteMatch<?> routeMatch, @Nullable Map<String, Object> claims) {
//If the request corresponds to a Controller entry
if(routeMatch != null && claims != null && claims.containsKey("roles") && claims.containsKey("email")){
if(routeMatch != null && claims != null && claims.containsKey("groups") && claims.containsKey("email")){
LOG.info("API call from "+claims.get("email")+ " on resource "+routeMatch.toString());
List<String> roles = (List<String>)claims.get("roles");
List<String> groups = (List<String>)claims.get("groups");

if (roles.contains(adminGroup)) {
LOG.debug("Authorized user "+claims.get("email")+" : Member of Admin Group ["+adminGroup+"]");
return SecurityRuleResult.ALLOWED;
}
// Not using routeMatch to get the resourceType and resourceSubtype values
Matcher matcher = namespacedResourcePattern.matcher(request.getPath());
while (matcher.find()){
//namespaced resource handling
Collection<RoleBinding> roleBindings = roleBindingRepository.findAllForGroups(roles);
Collection<RoleBinding> roleBindings = roleBindingRepository.findAllForGroups(groups);
//TODO users + groups
// roleBindings.addAll(roleBindingRepository.findAllForUser(request.getUserPrincipal().get().getName()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import com.michelin.ns4kafka.models.Connector;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpRequest;
import io.micronaut.http.client.RxHttpClient;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.retry.annotation.DefaultRetryPredicate;
Expand All @@ -30,11 +32,13 @@
public class ConnectRestService {

RxHttpClient httpClient;
KafkaAsyncExecutorConfig.ConnectConfig connectConfig;

public ConnectRestService(KafkaAsyncExecutorConfig kafkaAsyncExecutorConfig) {
try {
if(kafkaAsyncExecutorConfig.getConnect()!=null) {
httpClient = RxHttpClient.create(new URL(kafkaAsyncExecutorConfig.getConnect().getUrl()));
this.connectConfig = kafkaAsyncExecutorConfig.getConnect();
this.httpClient = RxHttpClient.create(new URL(kafkaAsyncExecutorConfig.getConnect().getUrl()));
}
}catch (MalformedURLException e){

Expand All @@ -45,34 +49,50 @@ public void cleanup(){
httpClient.close();
}

public <I, O> Flowable<O> retrieve(MutableHttpRequest<I> request, Argument<O> bodyType) {
if(StringUtils.isNotEmpty(connectConfig.getBasicAuthUsername())){
request = request.basicAuth(connectConfig.getBasicAuthUsername(),connectConfig.getBasicAuthPassword());
}
return httpClient.retrieve(request, bodyType)
.subscribeOn(Schedulers.io());
}
public <I, O> Flowable<HttpResponse<O>> exchange(MutableHttpRequest<I> request, Argument<O> bodyType) {
if(StringUtils.isNotEmpty(connectConfig.getBasicAuthUsername())){
request = request.basicAuth(connectConfig.getBasicAuthUsername(),connectConfig.getBasicAuthPassword());
}
return httpClient.exchange(request, bodyType)
.subscribeOn(Schedulers.io());
}

//TODO
// Caching (10-20seconds or something)
// https://guides.micronaut.io/micronaut-cache/guide/index.html
public Maybe<Map<String,ConnectItem>> list() {
return httpClient.retrieve(HttpRequest.GET("connectors?expand=info&expand=status"),
return retrieve(HttpRequest.GET("connectors?expand=info&expand=status"),
Argument.mapOf(String.class, ConnectItem.class))
.firstElement()
.subscribeOn(Schedulers.io());
.firstElement();
}

public Flowable<ConnectValidationResult> validate(Map<String,String> spec){
return httpClient.retrieve(
return retrieve(
HttpRequest.PUT("connector-plugins/"+spec.get("connector.class")+"/config/validate", spec),
ConnectValidationResult.class)
.subscribeOn(Schedulers.io());
Argument.of(ConnectValidationResult.class)
);
}

@Retryable(predicate = RebalanceRetryPredicate.class)
public Single<ConnectInfo> createOrUpdate(Connector connector){
return httpClient.retrieve(
return retrieve(
HttpRequest.PUT("connectors/"+connector.getMetadata().getName()+"/config",connector.getSpec()),
ConnectInfo.class)
.subscribeOn(Schedulers.io())
Argument.of(ConnectInfo.class))
.singleOrError();
}

public Flowable<HttpResponse<String>> delete(String connector){
return httpClient.exchange(HttpRequest.DELETE("connectors/"+connector), String.class);
return exchange(
HttpRequest.DELETE("connectors/"+connector),
Argument.STRING);

}

public static class RebalanceRetryPredicate extends DefaultRetryPredicate{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
package com.michelin.ns4kafka.services;

import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.scheduling.annotation.Scheduled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Collection;
import java.util.List;


@Singleton
public class KafkaAsyncExecutorScheduler {
private static final Logger LOG = LoggerFactory.getLogger(KafkaAsyncExecutorScheduler.class);

@Inject
ApplicationContext applicationContext;
@Inject
List<ConnectRestService> connectRestServices;
@Inject
List<KafkaAsyncExecutor> kafkaAsyncExecutors;

//TODO urgent : start the schedulder only when Application is started (ServerStartupEvent)
@Scheduled(initialDelay = "12s", fixedDelay = "20s")
void schedule(){

//TODO sequential forEach with exception handling (to let next clusters sync)
kafkaAsyncExecutors.forEach(KafkaAsyncExecutor::run);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public List<String> validate(Connector connector, String connectorType){
return validationErrors;
}

//TODO makeDefault from conf or template namespace ?
public static ConnectValidator makeDefault(){
return ConnectValidator.builder()
.validationConstraints(Map.of(
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/michelin/ns4kafka/validation/TopicValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,18 @@ public List<String> validate(Topic topic, Namespace namespace){
return validationErrors;
}

//TODO makeDefault from config or template ?
public static TopicValidator makeDefault(){
return TopicValidator.builder()
.validationConstraints(
Map.of( "replication.factor", ResourceValidator.Range.between(3,3),
"partitions", ResourceValidator.Range.between(3,6),
"cleanup.policy", ResourceValidator.ValidList.in("delete","compact"),
"min.insync.replicas", ResourceValidator.Range.between(2,2),
"retention.ms", ResourceValidator.Range.between(60000,604800000)
)
)
.build();
}

}

0 comments on commit 138ce90

Please sign in to comment.