Skip to content

Commit bb9a7cf

Browse files
committed
Handle IMPORT mode
1 parent 3dc1885 commit bb9a7cf

File tree

6 files changed

+54
-30
lines changed

6 files changed

+54
-30
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,8 +1152,8 @@ public List<Association> getAssociationsByResourceName(String resourceName,
11521152
public void deleteAssociations(String resourceId, String resourceType,
11531153
List<String> associationTypes, boolean cascadeLifecycle)
11541154
throws IOException, RestClientException {
1155-
restService.deleteAssociations(
1156-
DEFAULT_REQUEST_PROPERTIES, resourceId, resourceType, associationTypes, cascadeLifecycle);
1155+
restService.deleteAssociations(DEFAULT_REQUEST_PROPERTIES,
1156+
resourceId, resourceType, associationTypes, cascadeLifecycle, false);
11571157
}
11581158

11591159
private void checkMissingSchemaCache(String subject, ParsedSchema schema, boolean normalize)

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ public List<Association> getAssociationsByResourceName(
20562056
public void deleteAssociations(
20572057
Map<String, String> requestProperties,
20582058
String resourceId, String resourceType, List<String> associationTypes,
2059-
boolean cascadeLifecycle
2059+
boolean cascadeLifecycle, Boolean dryRun
20602060
) throws IOException,
20612061
RestClientException {
20622062
UriBuilder builder =
@@ -2068,6 +2068,9 @@ public void deleteAssociations(
20682068
builder.queryParam("associationType", associationType);
20692069
}
20702070
builder.queryParam("cascadeLifecycle", cascadeLifecycle);
2071+
if (dryRun != null) {
2072+
builder.queryParam("dryRun", dryRun);
2073+
}
20712074
String path = builder.build(resourceId).toString();
20722075

20732076
httpRequest(path, "DELETE", null,

core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/AssociationsResource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,9 @@ public void deleteAssociations(
486486
@Parameter(description = "Association type")
487487
@QueryParam("associationType") List<String> associationTypes,
488488
@Parameter(description = "Cascade lifecycle")
489-
@QueryParam("cascadeLifecycle") boolean cascadeLifecycle) {
489+
@QueryParam("cascadeLifecycle") boolean cascadeLifecycle,
490+
@Parameter(description = "Dry run")
491+
@QueryParam("dryRun") boolean dryRun) {
490492

491493
log.debug("Deleting association for resource {}", resourceId);
492494

@@ -509,7 +511,7 @@ public void deleteAssociations(
509511
QualifiedSubject.createFromUnqualified(schemaRegistry.tenant(), unqualifiedSubject);
510512
String qualifiedSubject = qs.toQualifiedSubject();
511513
schemaRegistry.deleteAssociationsOrForward(qualifiedSubject,
512-
resourceId, resourceType, associationTypes, cascadeLifecycle,
514+
resourceId, resourceType, associationTypes, cascadeLifecycle, dryRun,
513515
headerProperties);
514516
asyncResponse.resume(Response.status(204).build());
515517
} catch (AssociationFrozenException e) {

core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,10 @@ public AssociationResponse createOrUpdateAssociation(
11241124
if (schema == null) {
11251125
continue;
11261126
}
1127+
Mode subjectMode = getModeInScope(qualifiedSubject);
1128+
if (subjectMode == Mode.IMPORT) {
1129+
continue;
1130+
}
11271131
boolean normalize = Boolean.TRUE.equals(info.getNormalize());
11281132
Schema registeredSchema = register(qualifiedSubject,
11291133
new Schema(qualifiedSubject, schema), normalize, false);
@@ -1133,18 +1137,26 @@ public AssociationResponse createOrUpdateAssociation(
11331137
List<AssociationValue> associationValues =
11341138
AssociationValue.fromAssociationCreateOrUpdateRequest(
11351139
tenant(), request, associations, assocTypesToSkip);
1140+
for (AssociationValue associationValue : associationValues) {
1141+
putAssociation(associationValue);
1142+
}
1143+
return AssociationValue.toAssociationResponse(associationValues, registeredSchemas);
1144+
}
1145+
1146+
private void putAssociation(AssociationValue associationValue) throws SchemaRegistryException {
1147+
String qualifiedSubject = associationValue.getSubject();
11361148
try {
1137-
kafkaStore.waitUntilKafkaReaderReachesLastOffset(context, kafkaStoreTimeoutMs);
1138-
for (AssociationValue associationValue : associationValues) {
1139-
AssociationKey associationKey = associationValue.toKey();
1140-
kafkaStore.put(associationKey, associationValue);
1141-
log.debug("Wrote new assoc: {} to the Kafka data store with key {}",
1142-
associationValue, associationKey);
1143-
}
1144-
return AssociationValue.toAssociationResponse(associationValues, registeredSchemas);
1149+
AssociationKey associationKey = associationValue.toKey();
1150+
// Ensure cache is up-to-date before any potential writes
1151+
kafkaStore.waitUntilKafkaReaderReachesLastOffset(qualifiedSubject, kafkaStoreTimeoutMs);
1152+
kafkaStore.put(associationKey, associationValue);
1153+
log.debug("Wrote new assoc: {} to the Kafka data store with key {}",
1154+
associationValue, associationKey);
1155+
} catch (StoreTimeoutException te) {
1156+
throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
11451157
} catch (StoreException e) {
1146-
throw new SchemaRegistryStoreException("Failed to write new association value to the store",
1147-
e);
1158+
throw new SchemaRegistryStoreException("Error while putting the association for subject '"
1159+
+ qualifiedSubject + "' in the backend Kafka store", e);
11481160
}
11491161
}
11501162

@@ -1403,14 +1415,17 @@ public List<Association> getAssociationsByResourceName(
14031415

14041416
public void deleteAssociations(
14051417
String resourceId, String resourceType, List<String> associationTypes,
1406-
boolean cascadeLifecycle)
1418+
boolean cascadeLifecycle, boolean dryRun)
14071419
throws SchemaRegistryException {
14081420
// TODO RAY test idempotency
14091421
List<Association> associations = getAssociationsByResourceId(resourceId,
14101422
resourceType, associationTypes, null);
14111423
for (Association association : associations) {
14121424
checkDeleteAssociation(association, cascadeLifecycle);
14131425
}
1426+
if (dryRun) {
1427+
return;
1428+
}
14141429
for (Association association : associations) {
14151430
deleteAssociation(association, cascadeLifecycle);
14161431
}
@@ -1439,43 +1454,47 @@ private void deleteAssociation(Association oldAssociation, boolean cascadeLifecy
14391454
throws SchemaRegistryException {
14401455
String unqualifiedSubject = oldAssociation.getSubject();
14411456
QualifiedSubject qs = QualifiedSubject.createFromUnqualified(tenant(), unqualifiedSubject);
1442-
String subject = qs.toQualifiedSubject();
1457+
String qualifiedSubject = qs.toQualifiedSubject();
14431458
try {
14441459
AssociationKey key = new AssociationKey(
14451460
tenant(), oldAssociation.getResourceName(),
14461461
oldAssociation.getResourceNamespace(), oldAssociation.getResourceType(),
1447-
oldAssociation.getAssociationType(), subject);
1462+
oldAssociation.getAssociationType(), qualifiedSubject);
14481463
// Ensure cache is up-to-date before any potential writes
1449-
kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, kafkaStoreTimeoutMs);
1464+
kafkaStore.waitUntilKafkaReaderReachesLastOffset(qualifiedSubject, kafkaStoreTimeoutMs);
14501465
kafkaStore.put(key, null);
14511466
} catch (StoreTimeoutException te) {
14521467
throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
14531468
} catch (StoreException e) {
14541469
throw new SchemaRegistryStoreException("Error while deleting the association for subject '"
1455-
+ subject + "' in the backend Kafka store", e);
1470+
+ qualifiedSubject + "' in the backend Kafka store", e);
14561471
}
14571472

1473+
Mode subjectMode = getModeInScope(qualifiedSubject);
1474+
if (subjectMode == Mode.IMPORT) {
1475+
return;
1476+
}
14581477
if (cascadeLifecycle && oldAssociation.getLifecycle() == LifecyclePolicy.STRONG) {
14591478
// Delete subject
1460-
deleteSubject(subject, false);
1461-
deleteSubject(subject, true);
1479+
deleteSubject(qualifiedSubject, false);
1480+
deleteSubject(qualifiedSubject, true);
14621481
}
14631482
}
14641483

14651484
public void deleteAssociationsOrForward(
14661485
String subject, // subject is only used for locking per tenant
14671486
String resourceId, String resourceType, List<String> associationTypes,
1468-
boolean cascadeLifecycle, Map<String, String> headerProperties)
1487+
boolean cascadeLifecycle, boolean dryRun, Map<String, String> headerProperties)
14691488
throws SchemaRegistryException {
14701489
kafkaStore.lockFor(subject).lock();
14711490
try {
14721491
if (isLeader()) {
1473-
deleteAssociations(resourceId, resourceType, associationTypes, cascadeLifecycle);
1492+
deleteAssociations(resourceId, resourceType, associationTypes, cascadeLifecycle, dryRun);
14741493
} else {
14751494
// forward update config request to the leader
14761495
if (leaderIdentity != null) {
14771496
forwardDeleteAssociationsRequestToLeader(resourceId,
1478-
resourceType, associationTypes, cascadeLifecycle, headerProperties);
1497+
resourceType, associationTypes, cascadeLifecycle, dryRun, headerProperties);
14791498
} else {
14801499
throw new UnknownLeaderException("Delete association request failed since leader is "
14811500
+ "unknown");
@@ -1753,14 +1772,14 @@ private AssociationBatchResponse forwardCreateOrUpdateAssociationsRequestToLeade
17531772

17541773
private void forwardDeleteAssociationsRequestToLeader(
17551774
String resourceId, String resourceType, List<String> associationTypes,
1756-
boolean cascadeLifecycle, Map<String, String> headerProperties)
1775+
boolean cascadeLifecycle, boolean dryRun, Map<String, String> headerProperties)
17571776
throws SchemaRegistryRequestForwardingException {
17581777
final UrlList baseUrl = leaderRestService.getBaseUrls();
17591778

17601779
log.debug(String.format("Forwarding delete associations request to %s", baseUrl));
17611780
try {
17621781
leaderRestService.deleteAssociations(
1763-
headerProperties, resourceId, resourceType, associationTypes, cascadeLifecycle);
1782+
headerProperties, resourceId, resourceType, associationTypes, cascadeLifecycle, dryRun);
17641783
} catch (IOException e) {
17651784
throw new SchemaRegistryRequestForwardingException(
17661785
String.format("Unexpected error while forwarding the delete association request to %s",

core/src/main/java/io/confluent/kafka/schemaregistry/storage/SchemaRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ default void deleteAssociations(
436436
default void deleteAssociationsOrForward(
437437
String subject, // subject is only used for locking per tenant
438438
String resourceId, String resourceType, List<String> associationTypes,
439-
boolean cascadeLifecycle, Map<String, String> headerProperties)
439+
boolean cascadeLifecycle, boolean dryRun, Map<String, String> headerProperties)
440440
throws SchemaRegistryException {
441441
}
442442

core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiAssociationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ public void testBasicAssociation() throws Exception {
205205

206206
boolean cascadeDelete = false;
207207
restApp.restClient.deleteAssociations(RestService.DEFAULT_REQUEST_PROPERTIES,
208-
resourceId, "topic", Collections.singletonList("key"), cascadeDelete);
208+
resourceId, "topic", Collections.singletonList("key"), cascadeDelete, false);
209209

210210
associations = restApp.restClient.getAssociationsBySubject(
211211
RestService.DEFAULT_REQUEST_PROPERTIES, subject1, "topic",
@@ -217,7 +217,7 @@ public void testBasicAssociation() throws Exception {
217217

218218
cascadeDelete = true;
219219
restApp.restClient.deleteAssociations(RestService.DEFAULT_REQUEST_PROPERTIES,
220-
resourceId, "topic", Collections.singletonList("value"), cascadeDelete);
220+
resourceId, "topic", Collections.singletonList("value"), cascadeDelete, false);
221221

222222
associations = restApp.restClient.getAssociationsBySubject(
223223
RestService.DEFAULT_REQUEST_PROPERTIES, subject1, "topic",

0 commit comments

Comments
 (0)