Skip to content

Commit e3ba25f

Browse files
BE: Implement app restart on dynamic config change (#1151)
Co-authored-by: German Osin <german.osin@gmail.com>
1 parent 4e7767b commit e3ba25f

File tree

5 files changed

+243
-6
lines changed

5 files changed

+243
-6
lines changed

api/src/main/java/io/kafbat/ui/config/auth/RoleBasedAccessControlProperties.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
import jakarta.annotation.PostConstruct;
77
import java.util.ArrayList;
88
import java.util.List;
9+
import lombok.Getter;
910
import org.springframework.boot.context.properties.ConfigurationProperties;
1011

12+
@Getter
1113
@ConfigurationProperties("rbac")
1214
public class RoleBasedAccessControlProperties {
1315

14-
private final List<Role> roles = new ArrayList<>();
16+
private volatile List<Role> roles = new ArrayList<>();
1517

1618
private DefaultRole defaultRole;
1719

@@ -23,8 +25,9 @@ public void init() {
2325
}
2426
}
2527

26-
public List<Role> getRoles() {
27-
return roles;
28+
public void setRoles(List<Role> roles) {
29+
this.roles = roles;
30+
init();
2831
}
2932

3033
public void setDefaultRole(DefaultRole defaultRole) {
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package io.kafbat.ui.service.app;
2+
3+
import io.kafbat.ui.config.auth.RoleBasedAccessControlProperties;
4+
import io.kafbat.ui.util.MultiFileWatcher;
5+
import jakarta.annotation.PostConstruct;
6+
import jakarta.annotation.PreDestroy;
7+
import java.io.IOException;
8+
import java.nio.file.Path;
9+
import java.nio.file.Paths;
10+
import java.util.LinkedHashSet;
11+
import java.util.List;
12+
import java.util.Objects;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.Stream;
15+
import java.util.stream.StreamSupport;
16+
import lombok.RequiredArgsConstructor;
17+
import lombok.extern.slf4j.Slf4j;
18+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
19+
import org.springframework.boot.context.properties.bind.Binder;
20+
import org.springframework.boot.env.OriginTrackedMapPropertySource;
21+
import org.springframework.boot.env.YamlPropertySourceLoader;
22+
import org.springframework.boot.origin.Origin;
23+
import org.springframework.boot.origin.OriginTrackedValue;
24+
import org.springframework.boot.origin.TextResourceOrigin;
25+
import org.springframework.core.env.ConfigurableEnvironment;
26+
import org.springframework.core.env.PropertySource;
27+
import org.springframework.core.io.FileSystemResource;
28+
import org.springframework.core.io.Resource;
29+
import org.springframework.stereotype.Service;
30+
31+
@Service
32+
@RequiredArgsConstructor
33+
@Slf4j
34+
@ConditionalOnProperty(value = "config.autoreload", havingValue = "true")
35+
public class ConfigReloadService {
36+
37+
private static final String THREAD_NAME = "config-watcher-thread";
38+
39+
private final ConfigurableEnvironment environment;
40+
private final RoleBasedAccessControlProperties rbacProperties;
41+
private final YamlPropertySourceLoader yamlLoader = new YamlPropertySourceLoader();
42+
43+
private Thread watcherThread;
44+
private MultiFileWatcher multiFileWatcher;
45+
46+
@PostConstruct
47+
public void init() {
48+
var propertySourcePaths = StreamSupport.stream(environment.getPropertySources().spliterator(), false)
49+
.filter(OriginTrackedMapPropertySource.class::isInstance)
50+
.map(OriginTrackedMapPropertySource.class::cast)
51+
.flatMap(ps -> ps.getSource().values().stream())
52+
.map(v -> (v instanceof OriginTrackedValue otv) ? otv.getOrigin() : null)
53+
.filter(Objects::nonNull)
54+
.flatMap(o -> Stream.iterate(o, Objects::nonNull, Origin::getParent))
55+
.filter(TextResourceOrigin.class::isInstance)
56+
.map(TextResourceOrigin.class::cast)
57+
.map(TextResourceOrigin::getResource)
58+
.filter(Objects::nonNull)
59+
.filter(Resource::exists)
60+
.filter(Resource::isReadable)
61+
.filter(Resource::isFile)
62+
.map(r -> {
63+
try {
64+
return r.getURI();
65+
} catch (IOException e) {
66+
log.error("can't retrieve resource URL", e);
67+
return null;
68+
}
69+
})
70+
.filter(Objects::nonNull)
71+
.map(Paths::get)
72+
.collect(Collectors.toCollection(LinkedHashSet::new));
73+
74+
if (propertySourcePaths.isEmpty()) {
75+
log.debug("No config files found, auto reload is disabled");
76+
return;
77+
}
78+
79+
log.debug("Auto reload is enabled, will watch for config changes");
80+
81+
try {
82+
this.multiFileWatcher = new MultiFileWatcher(propertySourcePaths, this::reloadFile);
83+
this.watcherThread = new Thread(multiFileWatcher::watchLoop, THREAD_NAME);
84+
this.watcherThread.start();
85+
} catch (IOException e) {
86+
log.error("Error while registering watch service", e);
87+
}
88+
}
89+
90+
private void reloadFile(Path path) {
91+
log.info("Reloading file {}", path);
92+
try {
93+
if (!path.toString().endsWith(".yml") && !path.toString().endsWith(".yaml")) {
94+
log.trace("Skipping non-YML file {}", path);
95+
}
96+
97+
String name = String.format("Config resource 'file [%s] via location '%s'",
98+
path.toAbsolutePath(),
99+
path.toAbsolutePath()); // TODO extract an obj reference from env
100+
101+
List<PropertySource<?>> load = yamlLoader.load(path.toString(), new FileSystemResource(path));
102+
environment.getPropertySources().remove(name);
103+
environment.getPropertySources().addFirst(load.getFirst());
104+
Binder binder = Binder.get(environment);
105+
106+
binder.bind("rbac", RoleBasedAccessControlProperties.class)
107+
.ifBound(bound -> rbacProperties.setRoles(bound.getRoles()));
108+
} catch (Throwable e) {
109+
log.error("Error while reloading file {}", path, e);
110+
}
111+
}
112+
113+
@PreDestroy
114+
public void shutdown() {
115+
try {
116+
if (multiFileWatcher != null) {
117+
multiFileWatcher.close();
118+
}
119+
} catch (IOException ignored) {
120+
}
121+
if (watcherThread != null) {
122+
this.watcherThread.interrupt();
123+
}
124+
}
125+
}

api/src/main/java/io/kafbat/ui/service/rbac/AccessControlService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class AccessControlService {
6161
@Getter
6262
private Set<ProviderAuthorityExtractor> oauthExtractors = Collections.emptySet();
6363

64+
6465
@PostConstruct
6566
public void init() {
6667
if (CollectionUtils.isEmpty(properties.getRoles()) && properties.getDefaultRole() == null) {

api/src/main/java/io/kafbat/ui/util/DynamicConfigOperations.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
import org.springframework.http.codec.multipart.FilePart;
3131
import org.springframework.stereotype.Component;
3232
import org.yaml.snakeyaml.DumperOptions;
33-
import org.yaml.snakeyaml.LoaderOptions;
3433
import org.yaml.snakeyaml.Yaml;
35-
import org.yaml.snakeyaml.constructor.Constructor;
3634
import org.yaml.snakeyaml.introspector.BeanAccess;
3735
import org.yaml.snakeyaml.introspector.Property;
3836
import org.yaml.snakeyaml.introspector.PropertyUtils;
@@ -79,7 +77,7 @@ public Optional<PropertySource<?>> loadDynamicPropertySource() {
7977
if (dynamicConfigEnabled()) {
8078
Path configPath = dynamicConfigFilePath();
8179
if (!Files.exists(configPath) || !Files.isReadable(configPath)) {
82-
log.warn("Dynamic config file {} doesnt exist or not readable", configPath);
80+
log.warn("Dynamic config file {} doesnt exist or is not readable", configPath);
8381
return Optional.empty();
8482
}
8583
var propertySource = new CompositePropertySource("dynamicProperties");
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.kafbat.ui.util;
2+
3+
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
4+
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
5+
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
6+
7+
import java.io.IOException;
8+
import java.io.UncheckedIOException;
9+
import java.net.URI;
10+
import java.nio.file.ClosedWatchServiceException;
11+
import java.nio.file.FileSystems;
12+
import java.nio.file.Path;
13+
import java.nio.file.WatchEvent;
14+
import java.nio.file.WatchKey;
15+
import java.nio.file.WatchService;
16+
import java.util.Collection;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
import java.util.Set;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.function.Consumer;
22+
import lombok.extern.slf4j.Slf4j;
23+
import org.springframework.util.Assert;
24+
25+
@Slf4j
26+
public final class MultiFileWatcher implements AutoCloseable {
27+
28+
private final WatchService watchService = FileSystems.getDefault().newWatchService();
29+
private final Set<URI> watchedFiles = ConcurrentHashMap.newKeySet();
30+
private final Map<WatchKey, Path> watchDirsByKey = new HashMap<>();
31+
private final Consumer<Path> reloader;
32+
33+
public MultiFileWatcher(Collection<Path> filesToWatch, Consumer<Path> reloader) throws IOException {
34+
Assert.notNull(reloader, "reloader must not be null");
35+
this.reloader = reloader;
36+
37+
if (filesToWatch.isEmpty()) {
38+
log.warn("No files to watch, aborting");
39+
}
40+
41+
42+
watchedFiles.addAll(filesToWatch.stream()
43+
.map(p -> p.toAbsolutePath().normalize())
44+
.map(Path::toUri)
45+
.toList()
46+
);
47+
48+
if (watchedFiles.isEmpty()) {
49+
log.warn("No files to watch resolved, aborting");
50+
return;
51+
}
52+
53+
log.debug("Going to watch {} files", watchedFiles.size());
54+
log.trace("Watching files: {}", watchedFiles.stream().map(URI::toString).toList());
55+
56+
var directories = filesToWatch
57+
.stream()
58+
.map(Path::getParent)
59+
.distinct()
60+
.toList();
61+
62+
directories
63+
.forEach(dir -> {
64+
try {
65+
var key = dir.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE);
66+
watchDirsByKey.put(key, dir);
67+
} catch (IOException e) {
68+
throw new UncheckedIOException(e);
69+
}
70+
});
71+
72+
log.trace("Watching directories: {}", directories.stream().map(Path::toString).toList());
73+
}
74+
75+
public void watchLoop() {
76+
while (true) {
77+
try {
78+
var key = watchService.take();
79+
Path dir = watchDirsByKey.get(key);
80+
if (dir == null) {
81+
continue;
82+
}
83+
84+
for (WatchEvent<?> event : key.pollEvents()) {
85+
Path relativePath = (Path) event.context();
86+
Path path = dir.resolve(relativePath);
87+
if (watchedFiles.contains(path.toAbsolutePath().normalize().toUri())) {
88+
reloader.accept(path);
89+
}
90+
}
91+
key.reset();
92+
} catch (InterruptedException e) {
93+
Thread.currentThread().interrupt();
94+
break;
95+
} catch (ClosedWatchServiceException e) {
96+
log.trace("Watch service closed, exiting watcher thread");
97+
break;
98+
} catch (Exception e) {
99+
log.error("Error while calling the reloader", e);
100+
break;
101+
}
102+
}
103+
}
104+
105+
@Override
106+
public void close() throws IOException {
107+
watchService.close();
108+
}
109+
}
110+

0 commit comments

Comments
 (0)