Skip to content

Commit

Permalink
Merge branch '7.6.x' into 7.7.x by rayokota
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentSemaphore committed Oct 21, 2024
2 parents aa9ff36 + e0b9c97 commit 566d789
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 4 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
files="(.*).java"/>

<suppress checks="NPathComplexity"
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor|FieldRuleExecutor|SchemaRegistryCoordinator|SchemaTranslator|SchemaUtils|DekRegistry).java"/>
files="(.*).java"/>

<suppress checks="JavaNCSS"
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader|SchemaTranslator|SubjectVersionsResource).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class CachedSchemaRegistryClient implements SchemaRegistryClient {
private final Map<String, Map<Integer, ParsedSchema>> idToSchemaCache;
private final Map<String, Map<ParsedSchema, Integer>> schemaToVersionCache;
private final Map<String, Map<Integer, Schema>> versionToSchemaCache;
private final Cache<String, SchemaMetadata> latestVersionCache;
private final Cache<SubjectAndMetadata, SchemaMetadata> latestWithMetadataCache;
private final Cache<SubjectAndSchema, Long> missingSchemaCache;
private final Cache<SubjectAndInt, Long> missingIdCache;
private final Cache<SubjectAndInt, Long> missingVersionCache;
Expand Down Expand Up @@ -198,6 +201,25 @@ public CachedSchemaRegistryClient(
this.restService = restService;
this.ticker = ticker;

long latestTTL = SchemaRegistryClientConfig.getLatestTTL(configs);

CacheBuilder<Object, Object> latestVersionBuilder = CacheBuilder.newBuilder()
.maximumSize(cacheCapacity)
.ticker(ticker);
if (latestTTL >= 0) {
latestVersionBuilder = latestVersionBuilder.expireAfterWrite(
latestTTL, TimeUnit.SECONDS);
}
this.latestVersionCache = latestVersionBuilder.build();
CacheBuilder<Object, Object> latestWithMetadataBuilder = CacheBuilder.newBuilder()
.maximumSize(cacheCapacity)
.ticker(ticker);
if (latestTTL >= 0) {
latestWithMetadataBuilder = latestWithMetadataBuilder.expireAfterWrite(
latestTTL, TimeUnit.SECONDS);
}
this.latestWithMetadataCache = latestWithMetadataBuilder.build();

long missingIdTTL = SchemaRegistryClientConfig.getMissingIdTTL(configs);
long missingVersionTTL = SchemaRegistryClientConfig.getMissingVersionTTL(configs);
long missingSchemaTTL = SchemaRegistryClientConfig.getMissingSchemaTTL(configs);
Expand Down Expand Up @@ -578,17 +600,32 @@ public SchemaMetadata getSchemaMetadata(String subject, int version, boolean loo
@Override
public SchemaMetadata getLatestSchemaMetadata(String subject)
throws IOException, RestClientException {
SchemaMetadata schema = latestVersionCache.getIfPresent(subject);
if (schema != null) {
return schema;
}

io.confluent.kafka.schemaregistry.client.rest.entities.Schema response
= restService.getLatestVersion(subject);
return new SchemaMetadata(response);
schema = new SchemaMetadata(response);
latestVersionCache.put(subject, schema);
return schema;
}

@Override
public SchemaMetadata getLatestWithMetadata(String subject, Map<String, String> metadata,
boolean lookupDeletedSchema) throws IOException, RestClientException {
SubjectAndMetadata subjectAndMetadata = new SubjectAndMetadata(subject, metadata);
SchemaMetadata schema = latestWithMetadataCache.getIfPresent(subjectAndMetadata);
if (schema != null) {
return schema;
}

io.confluent.kafka.schemaregistry.client.rest.entities.Schema response
= restService.getLatestWithMetadata(subject, metadata, lookupDeletedSchema);
return new SchemaMetadata(response);
schema = new SchemaMetadata(response);
latestWithMetadataCache.put(subjectAndMetadata, schema);
return schema;
}

@Override
Expand Down Expand Up @@ -684,6 +721,8 @@ public synchronized List<Integer> deleteSubject(
idToSchemaCache.remove(subject);
schemaToIdCache.remove(subject);
schemaToResponseCache.remove(subject);
latestVersionCache.invalidate(subject);
latestWithMetadataCache.invalidateAll();
return restService.deleteSubject(requestProperties, subject, isPermanent);
}

Expand All @@ -709,6 +748,8 @@ public synchronized Integer deleteSchemaVersion(
.getOrDefault(subject, Collections.emptyMap())
.remove(Integer.valueOf(version));
}
latestVersionCache.invalidate(subject);
latestWithMetadataCache.invalidateAll();
return restService.deleteSchemaVersion(requestProperties, subject, version, isPermanent);
}

Expand Down Expand Up @@ -819,6 +860,8 @@ public synchronized void reset() {
idToSchemaCache.clear();
schemaToVersionCache.clear();
versionToSchemaCache.clear();
latestVersionCache.invalidateAll();
latestWithMetadataCache.invalidateAll();
missingSchemaCache.invalidateAll();
missingIdCache.invalidateAll();
missingVersionCache.invalidateAll();
Expand Down Expand Up @@ -943,4 +986,44 @@ public String toString() {
return "SubjectAndId{" + "subject='" + subject + '\'' + ", id=" + id + '}';
}
}

static class SubjectAndMetadata {
private final String subject;
private final Map<String, String> metadata;

public SubjectAndMetadata(String subject, Map<String, String> metadata) {
this.subject = subject;
this.metadata = ImmutableMap.copyOf(metadata);
}

public String subject() {
return subject;
}

public Map<String, String> metadata() {
return metadata;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SubjectAndMetadata that = (SubjectAndMetadata) o;
return Objects.equals(subject, that.subject) && Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(subject, metadata);
}

@Override
public String toString() {
return "SubjectAndMetadata{" + "subject='" + subject + '\'' + ", metadata=" + metadata + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class SchemaRegistryClientConfig {
public static final String PROXY_HOST = "proxy.host";
public static final String PROXY_PORT = "proxy.port";

public static final String LATEST_CACHE_TTL_CONFIG = "latest.cache.ttl.sec";
public static final long LATEST_CACHE_TTL_DEFAULT = 60;

public static final String MISSING_CACHE_SIZE_CONFIG = "missing.cache.size";
public static final String MISSING_ID_CACHE_TTL_CONFIG = "missing.id.cache.ttl.sec";
public static final String MISSING_VERSION_CACHE_TTL_CONFIG = "missing.version.cache.ttl.sec";
Expand Down Expand Up @@ -127,6 +130,17 @@ public static Integer getHttpReadTimeoutMs(Map<String, ?> configs) {
}
}

public static long getLatestTTL(Map<String, ?> configs) {
if (configs != null && configs.containsKey(LATEST_CACHE_TTL_CONFIG)) {
Object latestVal = configs.get(LATEST_CACHE_TTL_CONFIG);
return latestVal instanceof String
? Long.parseLong((String) latestVal)
: ((Number) latestVal).longValue();
} else {
return LATEST_CACHE_TTL_DEFAULT;
}
}

public static long getMissingIdTTL(Map<String, ?> configs) {
return configs != null && configs.containsKey(MISSING_ID_CACHE_TTL_CONFIG)
? (Long) configs.get(MISSING_ID_CACHE_TTL_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.confluent.kafka.schemaregistry.client;

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.FakeTicker;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
Expand Down Expand Up @@ -549,6 +550,85 @@ public void testGetSchemasEmptyReturn() throws Exception {
assertEquals(0, parsedSchemas.size());
}

@Test
public void testLatestVersionCache() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaRegistryClientConfig.LATEST_CACHE_TTL_CONFIG, 60L);

FakeTicker fakeTicker = new FakeTicker();
client = new CachedSchemaRegistryClient(
restService,
CACHE_CAPACITY,
null,
configs,
null,
fakeTicker
);

expect(restService.getLatestVersion(eq(SUBJECT_0)))
.andReturn(
new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1,
ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0))
.andReturn(
new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1,
ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0));

replay(restService);

SchemaMetadata schemaMetadata = client.getLatestSchemaMetadata(SUBJECT_0);
assertEquals(ID_25, schemaMetadata.getId());

fakeTicker.advance(59, TimeUnit.SECONDS);

// Should hit the cache
schemaMetadata = client.getLatestSchemaMetadata(SUBJECT_0);
assertEquals(ID_25, schemaMetadata.getId());

fakeTicker.advance(2, TimeUnit.SECONDS);
Thread.sleep(100);
assertNotNull(client.getLatestSchemaMetadata(SUBJECT_0));
}

@Test
public void testLatestWithMetadataCache() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put(SchemaRegistryClientConfig.LATEST_CACHE_TTL_CONFIG, 60L);

FakeTicker fakeTicker = new FakeTicker();
client = new CachedSchemaRegistryClient(
restService,
CACHE_CAPACITY,
null,
configs,
null,
fakeTicker
);

expect(restService.getLatestWithMetadata(eq(SUBJECT_0), anyObject(), eq(false)))
.andReturn(
new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1,
ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0))
.andReturn(
new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(SUBJECT_0, 1,
ID_25, AvroSchema.TYPE, Collections.emptyList(), SCHEMA_STR_0));

replay(restService);

Map<String, String> metadata = ImmutableMap.of("key", "value");
SchemaMetadata schemaMetadata = client.getLatestWithMetadata(SUBJECT_0, metadata, false);
assertEquals(ID_25, schemaMetadata.getId());

fakeTicker.advance(59, TimeUnit.SECONDS);

// Should hit the cache
schemaMetadata = client.getLatestWithMetadata(SUBJECT_0, metadata, false);
assertEquals(ID_25, schemaMetadata.getId());

fakeTicker.advance(2, TimeUnit.SECONDS);
Thread.sleep(100);
assertNotNull(client.getLatestWithMetadata(SUBJECT_0, metadata, false));
}

@Test
public void testMissingIdCache() throws Exception {
Map<String, Object> configs = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class AbstractKafkaSchemaSerDeConfig extends AbstractConfig {
public static final String LATEST_CACHE_SIZE_DOC =
"The maximum size for caches holding latest schemas";

public static final String LATEST_CACHE_TTL = "latest.cache.ttl.sec";
public static final String LATEST_CACHE_TTL = SchemaRegistryClientConfig.LATEST_CACHE_TTL_CONFIG;
public static final int LATEST_CACHE_TTL_DEFAULT = -1;
public static final String LATEST_CACHE_TTL_DOC =
"The TTL for caches holding latest schemas, or -1 for no TTL";
Expand Down

0 comments on commit 566d789

Please sign in to comment.