Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.server.metrics.KafkaMetricsGroup;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.metrics.Metrics;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
Expand All @@ -21,32 +40,33 @@ public final class CaffeineCache implements ObjectCache {
*/
private final AsyncCache<CacheKey, FileExtent> cache;

private final CaffeineCacheMetrics metrics;

public CaffeineCache(
final long maxCacheSize,
final long lifespanSeconds,
final int maxIdleSeconds) {
cache = Caffeine.newBuilder()
.maximumSize(maxCacheSize)
.expireAfterWrite(Duration.ofSeconds(lifespanSeconds))
.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds: 180))
.recordStats()
.buildAsync();
metrics = new CaffeineCacheMetrics(cache.synchronous());
final int maxIdleSeconds,
final Metrics storageMetrics
) {
final Caffeine<Object, Object> builder = Caffeine.newBuilder();
// size and weight limits
builder.maximumSize(maxCacheSize);
// expiration policies
builder.expireAfterWrite(Duration.ofSeconds(lifespanSeconds));
builder.expireAfterAccess(Duration.ofSeconds(maxIdleSeconds != -1 ? maxIdleSeconds : 180));
// enable metrics
builder.recordStats();
cache = builder.buildAsync();
new CaffeineCacheMetrics(storageMetrics, cache.synchronous());
}

@Override
public void close() throws IOException {
metrics.close();
// no resources to close
}

@Override
public FileExtent computeIfAbsent(final CacheKey key, final Function<CacheKey, FileExtent> mappingFunction) {
final CompletableFuture<FileExtent> future = new CompletableFuture<>();
final CompletableFuture<FileExtent> existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> {
return future;
});
final CompletableFuture<FileExtent> existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> future);
// If existing future is not the same object as created in this function
// there was a pending cache load and this call is required to join the existing future
// and discard the created one.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;

import com.github.benmanes.caffeine.cache.Cache;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import io.aiven.inkless.common.metrics.MeasurableValue;
import io.aiven.inkless.common.metrics.SensorProvider;

public final class CaffeineCacheMetrics implements Closeable {

private final Metrics metrics;

public final class CaffeineCacheMetrics {
private final Sensor cacheSizeSensor;
private final Sensor cacheHitCountSensor;
private final Sensor cacheHitRateSensor;
Expand All @@ -30,13 +37,7 @@ public final class CaffeineCacheMetrics implements Closeable {
private final Sensor cacheAvgLoadPenaltySensor;
private final Sensor cacheEvictionsSensor;

public CaffeineCacheMetrics(final Cache<?, ?> cache) {
final JmxReporter reporter = new JmxReporter();
this.metrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(CaffeineCacheMetricsRegistry.METRIC_CONTEXT)
);

public CaffeineCacheMetrics(final Metrics metrics, final Cache<?, ?> cache) {
final CaffeineCacheMetricsRegistry metricsRegistry = new CaffeineCacheMetricsRegistry();
cacheSizeSensor = registerLongSensor(metrics, metricsRegistry.cacheSizeMetricName, CaffeineCacheMetricsRegistry.CACHE_SIZE, cache::estimatedSize);
cacheHitCountSensor = registerLongSensor(metrics, metricsRegistry.cacheHitCountMetricName, CaffeineCacheMetricsRegistry.CACHE_HIT_COUNT, () -> cache.stats().hitCount());
Expand All @@ -62,8 +63,7 @@ static Sensor registerLongSensor(final Metrics metrics, final MetricNameTemplate
@Override
public String toString() {
return "CaffeineCacheMetrics{" +
"metrics=" + metrics +
", cacheSizeSensor=" + cacheSizeSensor +
"cacheSizeSensor=" + cacheSizeSensor +
", cacheHitCountSensor=" + cacheHitCountSensor +
", cacheHitRateSensor=" + cacheHitRateSensor +
", cacheMissCountSensor=" + cacheMissCountSensor +
Expand All @@ -72,9 +72,4 @@ public String toString() {
", cacheEvictionsSensor=" + cacheEvictionsSensor +
'}';
}

@Override
public void close() throws IOException {
metrics.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.MetricNameTemplate;

public class CaffeineCacheMetricsRegistry {
public static final String METRIC_CONTEXT = "io.aiven.inkless.cache.caffeine";
public static final String METRIC_GROUP = "wal-segment-cache";

public static final String CACHE_SIZE = "size";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

public class IncreasedLogStartOffsetException extends StaleLogFragmentException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.apache.kafka.common.TopicIdPartition;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

/* When this Exception is raised by a LogFragment, it means that the data contained in the LogFragment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class SharedState implements Closeable {
private final BatchCoordinateCache batchCoordinateCache;
private final BrokerTopicStats brokerTopicStats;
private final Supplier<LogConfig> defaultTopicConfigs;
private final Metrics storageMetrics;
// Accessible for testing
final Metrics storageMetrics;

public SharedState(
final Time time,
Expand All @@ -71,7 +72,8 @@ public SharedState(
final ObjectCache cache,
final BatchCoordinateCache batchCoordinateCache,
final BrokerTopicStats brokerTopicStats,
final Supplier<LogConfig> defaultTopicConfigs
final Supplier<LogConfig> defaultTopicConfigs,
final Metrics storageMetrics
) {
this.time = time;
this.brokerId = brokerId;
Expand All @@ -84,12 +86,7 @@ public SharedState(
this.batchCoordinateCache = batchCoordinateCache;
this.brokerTopicStats = brokerTopicStats;
this.defaultTopicConfigs = defaultTopicConfigs;

final MetricsReporter reporter = new JmxReporter();
this.storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
this.storageMetrics = storageMetrics;
}

public static SharedState initialize(
Expand All @@ -107,6 +104,11 @@ public static SharedState initialize(
"Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
);
}
final MetricsReporter reporter = new JmxReporter();
final Metrics storageMetrics = new Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext(STORAGE_METRIC_CONTEXT)
);
return new SharedState(
time,
brokerId,
Expand All @@ -118,11 +120,13 @@ public static SharedState initialize(
new CaffeineCache(
config.cacheMaxCount(),
config.cacheExpirationLifespanSec(),
config.cacheExpirationMaxIdleSec()
config.cacheExpirationMaxIdleSec(),
storageMetrics
),
config.isBatchCoordinateCacheEnabled() ? new CaffeineBatchCoordinateCache(config.batchCoordinateCacheTtl()) : new NullBatchCoordinateCache(),
brokerTopicStats,
defaultTopicConfigs
defaultTopicConfigs,
storageMetrics
);
}

Expand Down
Loading