Skip to content

Commit 715be14

Browse files
author
Alex Borodin
committed
Use LRUCache in TableMetadataCache
We recently discovered that LRUCache, based on LinkedHashMap, performs almost twice as fast as the Caffein max size cache. Let's replace the caffeine cache to optimise the performance.
1 parent 7862e1c commit 715be14

File tree

3 files changed

+15
-19
lines changed

3 files changed

+15
-19
lines changed

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.dynamic;
2020

21-
import com.github.benmanes.caffeine.cache.Cache;
22-
import com.github.benmanes.caffeine.cache.Caffeine;
2321
import java.util.Map;
2422
import java.util.Set;
2523
import org.apache.flink.annotation.Internal;
@@ -53,18 +51,18 @@ class TableMetadataCache {
5351
private final Catalog catalog;
5452
private final long refreshMs;
5553
private final int inputSchemasPerTableCacheMaximumSize;
56-
private final Cache<TableIdentifier, CacheItem> cache;
54+
private final LRUCache<TableIdentifier, CacheItem> tableCache;
5755

5856
TableMetadataCache(
5957
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
6058
this.catalog = catalog;
6159
this.refreshMs = refreshMs;
6260
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
63-
this.cache = Caffeine.newBuilder().maximumSize(maximumSize).build();
61+
this.tableCache = new LRUCache<>(maximumSize, ignored -> {});
6462
}
6563

6664
Tuple2<Boolean, Exception> exists(TableIdentifier identifier) {
67-
CacheItem cached = cache.getIfPresent(identifier);
65+
CacheItem cached = tableCache.get(identifier);
6866
if (cached != null && Boolean.TRUE.equals(cached.tableExists)) {
6967
return EXISTS;
7068
} else if (needsRefresh(cached, true)) {
@@ -87,7 +85,7 @@ PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) {
8785
}
8886

8987
void update(TableIdentifier identifier, Table table) {
90-
cache.put(
88+
tableCache.put(
9189
identifier,
9290
new CacheItem(
9391
true,
@@ -98,7 +96,7 @@ void update(TableIdentifier identifier, Table table) {
9896
}
9997

10098
private String branch(TableIdentifier identifier, String branch, boolean allowRefresh) {
101-
CacheItem cached = cache.getIfPresent(identifier);
99+
CacheItem cached = tableCache.get(identifier);
102100
if (cached != null && cached.tableExists && cached.branches.contains(branch)) {
103101
return branch;
104102
}
@@ -113,7 +111,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe
113111

114112
private ResolvedSchemaInfo schema(
115113
TableIdentifier identifier, Schema input, boolean allowRefresh) {
116-
CacheItem cached = cache.getIfPresent(identifier);
114+
CacheItem cached = tableCache.get(identifier);
117115
Schema compatible = null;
118116
if (cached != null && cached.tableExists) {
119117
// This only works if the {@link Schema#equals(Object)} returns true for the old schema
@@ -164,7 +162,7 @@ private ResolvedSchemaInfo schema(
164162
}
165163

166164
private PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec, boolean allowRefresh) {
167-
CacheItem cached = cache.getIfPresent(identifier);
165+
CacheItem cached = tableCache.get(identifier);
168166
if (cached != null && cached.tableExists) {
169167
for (PartitionSpec tableSpec : cached.specs.values()) {
170168
if (PartitionSpecEvolution.checkCompatibility(tableSpec, spec)) {
@@ -188,7 +186,7 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
188186
return EXISTS;
189187
} catch (NoSuchTableException e) {
190188
LOG.debug("Table doesn't exist {}", identifier, e);
191-
cache.put(identifier, new CacheItem(false, null, null, null, 1));
189+
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
192190
return Tuple2.of(false, e);
193191
}
194192
}
@@ -199,7 +197,7 @@ private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
199197
}
200198

201199
public void invalidate(TableIdentifier identifier) {
202-
cache.invalidate(identifier);
200+
tableCache.remove(identifier);
203201
}
204202

205203
/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
@@ -268,7 +266,7 @@ DataConverter recordConverter() {
268266
}
269267

270268
@VisibleForTesting
271-
Cache<TableIdentifier, CacheItem> getInternalCache() {
272-
return cache;
269+
LRUCache<TableIdentifier, CacheItem> getInternalCache() {
270+
return tableCache;
273271
}
274272
}

flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ void testCachingDisabled() {
9090
TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10);
9191

9292
// Cleanup routine doesn't run after every write
93-
cache.getInternalCache().cleanUp();
94-
assertThat(cache.getInternalCache().estimatedSize()).isEqualTo(0);
93+
assertThat(cache.getInternalCache().size()).isEqualTo(0);
9594
}
9695
}

flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ void testBranchCreationAndCaching() {
8585

8686
catalog.createTable(tableIdentifier, SCHEMA);
8787
tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
88-
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().getIfPresent(tableIdentifier);
88+
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
8989
assertThat(cacheItem).isNotNull();
9090

9191
tableUpdater.update(tableIdentifier, "myBranch", SCHEMA, PartitionSpec.unpartitioned());
92-
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier)).isEqualTo(cacheItem);
92+
assertThat(cache.getInternalCache().get(tableIdentifier)).isEqualTo(cacheItem);
9393
}
9494

9595
@Test
@@ -153,7 +153,6 @@ void testLastResultInvalidation() {
153153
.isEqualTo(CompareSchemasVisitor.Result.SAME);
154154

155155
// Last result cache should be cleared
156-
assertThat(cache.getInternalCache().getIfPresent(tableIdentifier).inputSchemas().get(SCHEMA2))
157-
.isNull();
156+
assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas().get(SCHEMA2)).isNull();
158157
}
159158
}

0 commit comments

Comments
 (0)