Skip to content

feat: limit verdict cache size #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ docs/_build
build/
rpm/
rpmbuild/
bin
.vscode
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@ dependencies {
implementation "com.google.guava:guava:33.4.0-jre"
implementation "com.google.code.gson:gson:2.11.0"

implementation "com.github.ben-manes.caffeine:caffeine:3.2.0"

compileOnly "org.apache.kafka:kafka_2.12:${kafkaVersion}"
compileOnly "org.slf4j:slf4j-api:1.7.36"

testImplementation "org.apache.kafka:kafka_2.12:${kafkaVersion}"

testImplementation "org.slf4j:slf4j-log4j12:1.7.36"

testImplementation 'org.openjdk.jol:jol-cli:0.17'

testImplementation "org.junit.jupiter:junit-jupiter-api:5.11.4"
testImplementation "org.junit.jupiter:junit-jupiter:5.11.4"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.11.4"
Expand Down Expand Up @@ -125,6 +129,11 @@ jacoco {

test {
useJUnitPlatform()
testLogging {
events 'failed', 'skipped'
exceptionFormat 'full'
}
jvmArgs '-Djol.magicFieldOffset=true'
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.aiven.kafka.auth.audit.NoAuditor;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

public final class AivenAclAuthorizerConfig extends AbstractConfig {
Expand All @@ -37,7 +38,13 @@ public final class AivenAclAuthorizerConfig extends AbstractConfig {
private static final String LOG_DENIALS_CONF = PREFIX + "log.denials";
private static final String CONFIG_REFRESH_CONF = PREFIX + "config.refresh.interval";
private static final String LIST_ACLS_ENABLED_CONF = PREFIX + "list.acls.enabled";


private static final String CACHE_MAX_SIZE_PERCENTAGE_CONF = PREFIX + "cache.max.size.percentage";
private static final String CACHE_MAX_SIZE_PERCENTAGE_DOC =
"The maximum (estimated) size of the cache as a percentage of the heap size. The default value is 25%.";
private static final String CACHE_EXIRE_AFTER_ACCESS_MINUTES_CONF = PREFIX + "cache.expire.after.access.minutes";
private static final String CACHE_EXPIRE_AFTER_ACCESS_MINUTES_DOC =
"The time after which the cache entries expire after last access in minutes. The default value is 60 minutes.";

public static final String METRICS_NUM_SAMPLES_CONFIG = PREFIX
+ CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
Expand Down Expand Up @@ -118,6 +125,21 @@ public static ConfigDef configDef() {
Sensor.RecordingLevel.TRACE.toString()),
ConfigDef.Importance.LOW,
METRICS_RECORDING_LEVEL_DOC
)
.define(
CACHE_MAX_SIZE_PERCENTAGE_CONF,
ConfigDef.Type.INT,
25,
between(5, 50),
ConfigDef.Importance.LOW,
CACHE_MAX_SIZE_PERCENTAGE_DOC
)
.define(
CACHE_EXIRE_AFTER_ACCESS_MINUTES_CONF,
ConfigDef.Type.INT,
60,
ConfigDef.Importance.LOW,
CACHE_EXPIRE_AFTER_ACCESS_MINUTES_DOC
);
}

Expand All @@ -140,4 +162,12 @@ public int configRefreshInterval() {
public boolean listAclsEnabled() {
return getBoolean(LIST_ACLS_ENABLED_CONF);
}

public int getCacheMaxSizePercentage() {
return getInt(CACHE_MAX_SIZE_PERCENTAGE_CONF);
}

public int getCacheExpireAfterAccess() {
return getInt(CACHE_EXIRE_AFTER_ACCESS_MINUTES_CONF);
}
}
6 changes: 4 additions & 2 deletions src/main/java/io/aiven/kafka/auth/AivenAclAuthorizerV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public void configure(final java.util.Map<String, ?> configs) {

configFile = config.getConfigFile();
final AclJsonReader jsonReader = new AclJsonReader(configFile.toPath());
cacheReference.set(VerdictCache.create(loadAcls(jsonReader)));
cacheReference.set(VerdictCache.create(loadAcls(jsonReader), config.getCacheMaxSizePercentage(),
config.getCacheExpireAfterAccess()));
final AtomicReference<WatchKey> watchKeyReference = new AtomicReference<>(subscribeToAclChanges(configFile));
scheduledExecutorService.scheduleWithFixedDelay(() -> {
final WatchKey watchKey = watchKeyReference.get();
Expand All @@ -130,7 +131,8 @@ public void configure(final java.util.Map<String, ?> configs) {
}).findFirst().ifPresent(watchEvent -> {
LOGGER.info("{}: {}, Modified: {}",
watchEvent.kind(), watchEvent.context(), configFile.lastModified());
cacheReference.set(VerdictCache.create(loadAcls(jsonReader)));
cacheReference.set(VerdictCache.create(loadAcls(jsonReader), config.getCacheMaxSizePercentage(),
config.getCacheExpireAfterAccess()));
});
if (!watchKey.reset()) {
watchKeyReference.compareAndSet(watchKey, subscribeToAclChanges(configFile));
Expand Down
46 changes: 38 additions & 8 deletions src/main/java/io/aiven/kafka/auth/VerdictCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -31,15 +30,43 @@

import io.aiven.kafka.auth.json.AclPermissionType;
import io.aiven.kafka.auth.json.AivenAcl;
import io.aiven.kafka.auth.utils.ObjectSizeEstimator;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

public class VerdictCache {
private final List<AivenAcl> allowAclEntries;
private final List<AivenAcl> denyAclEntries;
private final Map<String, Boolean> cache = new ConcurrentHashMap<>();
private final Cache<String, Boolean> cache;


private VerdictCache(@Nonnull final List<AivenAcl> denyAclEntries, @Nonnull final List<AivenAcl> allowAclEntries) {
private VerdictCache(@Nonnull final List<AivenAcl> denyAclEntries, @Nonnull final List<AivenAcl> allowAclEntries,
final double maxSizePercentage, final int expireAfterAccessMinutes) {
this.denyAclEntries = denyAclEntries;
this.allowAclEntries = allowAclEntries;

final long maxHeapSize = Runtime.getRuntime().maxMemory();
final long maxSize = (long) ((maxHeapSize / 100) * maxSizePercentage);

cache = Caffeine.newBuilder()
.expireAfterAccess(expireAfterAccessMinutes, java.util.concurrent.TimeUnit.MINUTES)
.maximumWeight(maxSize)
.weigher((String key, Boolean value) -> {
final int keySize = ObjectSizeEstimator.estimateStringSize(key);
final int valueSize = ObjectSizeEstimator.estimateBooleanSize(value);
final int entrySize = keySize + valueSize + ObjectSizeEstimator.estimateEntryOverhead();
// 1.5x overhead for cache metadata, lazy initialization etc.
final int totalSize = (int) (entrySize * 1.5);
return totalSize;
})
.build();
}

public long getEstimatedSizeBytes() {
final var eviction = cache.policy().eviction().orElseThrow();
final long currentWeight = eviction.weightedSize().orElseThrow();
return currentWeight;
}

public boolean get(
Expand All @@ -55,7 +82,7 @@ public boolean get(
+ "|" + principal.getName()
+ "|" + principalType;

return cache.computeIfAbsent(cacheKey, key -> {
return cache.get(cacheKey, key -> {
final Predicate<AivenAcl> matcher = acl ->
acl.match(principalType, principal.getName(), host, operation, resource);
if (denyAclEntries.stream().anyMatch(matcher)) {
Expand All @@ -70,13 +97,16 @@ public Stream<AivenAcl> aclEntries() {
return Stream.concat(denyAclEntries.stream(), allowAclEntries.stream());
}

public static VerdictCache create(final List<AivenAcl> aclEntries) {
public static VerdictCache create(final List<AivenAcl> aclEntries, final double maxSizePercentage,
final int expireAfterAccessMinutes) {
if (aclEntries == null || aclEntries.isEmpty()) {
return new VerdictCache(Collections.emptyList(), Collections.emptyList());
return new VerdictCache(Collections.emptyList(), Collections.emptyList(), maxSizePercentage,
expireAfterAccessMinutes);
}

final Map<Boolean, List<AivenAcl>> partitionedEntries = aclEntries.stream()
.collect(Collectors.partitioningBy(x -> x.getPermissionType() == AclPermissionType.DENY));
return new VerdictCache(partitionedEntries.get(true), partitionedEntries.get(false));
.collect(Collectors.partitioningBy(x -> x.getPermissionType() == AclPermissionType.DENY));
return new VerdictCache(partitionedEntries.get(true), partitionedEntries.get(false), maxSizePercentage,
expireAfterAccessMinutes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 Aiven Oy https://aiven.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.auth.utils;

public class ObjectSizeEstimator {

private ObjectSizeEstimator() {
// Prevent instantiation
}

/**
* Estimates the size of a String object in Java.
* This is a rough estimate. The real size may vary based on JVM implementation and settings.
*
* @param s the String to estimate the size of
* @return estimated size in bytes
*/
public static int estimateStringSize(final String s) {
/*
Example: "Hello, World!" (13 chars)

String object:
- Header: 12 bytes
- Coder: 1 byte
- Hash: 4 bytes
- value reference: 4 bytes (Compressed OOPs)
- Padding: 3 bytes (for alignment)
------------------------------------------------
24 bytes

byte[] array:
- Header: 12 bytes
- Length field: 4 bytes
- Data: 13 bytes (1 byte per char, since Java 9 for Latin-1)
- Padding: 3 bytes (for 8-byte alignment)
------------------------------------------------
32 bytes

Total: 56 bytes
*/
if (s == null) {
return 0;
}
final int stringObjectSize = 24; // Object header + coder + hash + ref + alignment
int charArraySize = s.length() + 16; // 1 byte per char + header + length
charArraySize += (charArraySize % 8 == 0) ? 0 : (8 - (charArraySize % 8)); // Padding for alignment
return stringObjectSize + charArraySize;
}

/**
* Estimates the size of a Boolean object in Java.
* This is a rough estimate. The real size may vary based on JVM implementation and settings.
*
* @param b the Boolean to estimate the size of
* @return estimated size in bytes
*/
public static int estimateBooleanSize(final Boolean b) {
if (b == null) {
return 0;
}
return 12 + 4; // Object header + ref
}

/**
* Estimates the overhead of a cache entry in Java.
* This is a rough estimate. The real size may vary based on JVM implementation and settings.
*
* @return estimated size in bytes
*/
public static int estimateEntryOverhead() {
return 12 + 4 + 4 + 4; // Object header + key ref + value ref + hash
}

}
Loading
Loading