@@ -1124,6 +1124,10 @@ public AssociationResponse createOrUpdateAssociation(
11241124 if (schema == null ) {
11251125 continue ;
11261126 }
1127+ Mode subjectMode = getModeInScope (qualifiedSubject );
1128+ if (subjectMode == Mode .IMPORT ) {
1129+ continue ;
1130+ }
11271131 boolean normalize = Boolean .TRUE .equals (info .getNormalize ());
11281132 Schema registeredSchema = register (qualifiedSubject ,
11291133 new Schema (qualifiedSubject , schema ), normalize , false );
@@ -1133,18 +1137,26 @@ public AssociationResponse createOrUpdateAssociation(
11331137 List <AssociationValue > associationValues =
11341138 AssociationValue .fromAssociationCreateOrUpdateRequest (
11351139 tenant (), request , associations , assocTypesToSkip );
1140+ for (AssociationValue associationValue : associationValues ) {
1141+ putAssociation (associationValue );
1142+ }
1143+ return AssociationValue .toAssociationResponse (associationValues , registeredSchemas );
1144+ }
1145+
1146+ private void putAssociation (AssociationValue associationValue ) throws SchemaRegistryException {
1147+ String qualifiedSubject = associationValue .getSubject ();
11361148 try {
1137- kafkaStore . waitUntilKafkaReaderReachesLastOffset ( context , kafkaStoreTimeoutMs );
1138- for ( AssociationValue associationValue : associationValues ) {
1139- AssociationKey associationKey = associationValue . toKey ( );
1140- kafkaStore .put (associationKey , associationValue );
1141- log .debug ("Wrote new assoc: {} to the Kafka data store with key {}" ,
1142- associationValue , associationKey );
1143- }
1144- return AssociationValue . toAssociationResponse ( associationValues , registeredSchemas );
1149+ AssociationKey associationKey = associationValue . toKey ( );
1150+ // Ensure cache is up-to-date before any potential writes
1151+ kafkaStore . waitUntilKafkaReaderReachesLastOffset ( qualifiedSubject , kafkaStoreTimeoutMs );
1152+ kafkaStore .put (associationKey , associationValue );
1153+ log .debug ("Wrote new assoc: {} to the Kafka data store with key {}" ,
1154+ associationValue , associationKey );
1155+ } catch ( StoreTimeoutException te ) {
1156+ throw new SchemaRegistryTimeoutException ( "Write to the Kafka store timed out while" , te );
11451157 } catch (StoreException e ) {
1146- throw new SchemaRegistryStoreException ("Failed to write new association value to the store" ,
1147- e );
1158+ throw new SchemaRegistryStoreException ("Error while putting the association for subject '"
1159+ + qualifiedSubject + "' in the backend Kafka store" , e );
11481160 }
11491161 }
11501162
@@ -1403,14 +1415,17 @@ public List<Association> getAssociationsByResourceName(
14031415
14041416 public void deleteAssociations (
14051417 String resourceId , String resourceType , List <String > associationTypes ,
1406- boolean cascadeLifecycle )
1418+ boolean cascadeLifecycle , boolean dryRun )
14071419 throws SchemaRegistryException {
14081420 // TODO RAY test idempotency
14091421 List <Association > associations = getAssociationsByResourceId (resourceId ,
14101422 resourceType , associationTypes , null );
14111423 for (Association association : associations ) {
14121424 checkDeleteAssociation (association , cascadeLifecycle );
14131425 }
1426+ if (dryRun ) {
1427+ return ;
1428+ }
14141429 for (Association association : associations ) {
14151430 deleteAssociation (association , cascadeLifecycle );
14161431 }
@@ -1439,43 +1454,47 @@ private void deleteAssociation(Association oldAssociation, boolean cascadeLifecy
14391454 throws SchemaRegistryException {
14401455 String unqualifiedSubject = oldAssociation .getSubject ();
14411456 QualifiedSubject qs = QualifiedSubject .createFromUnqualified (tenant (), unqualifiedSubject );
1442- String subject = qs .toQualifiedSubject ();
1457+ String qualifiedSubject = qs .toQualifiedSubject ();
14431458 try {
14441459 AssociationKey key = new AssociationKey (
14451460 tenant (), oldAssociation .getResourceName (),
14461461 oldAssociation .getResourceNamespace (), oldAssociation .getResourceType (),
1447- oldAssociation .getAssociationType (), subject );
1462+ oldAssociation .getAssociationType (), qualifiedSubject );
14481463 // Ensure cache is up-to-date before any potential writes
1449- kafkaStore .waitUntilKafkaReaderReachesLastOffset (subject , kafkaStoreTimeoutMs );
1464+ kafkaStore .waitUntilKafkaReaderReachesLastOffset (qualifiedSubject , kafkaStoreTimeoutMs );
14501465 kafkaStore .put (key , null );
14511466 } catch (StoreTimeoutException te ) {
14521467 throw new SchemaRegistryTimeoutException ("Write to the Kafka store timed out while" , te );
14531468 } catch (StoreException e ) {
14541469 throw new SchemaRegistryStoreException ("Error while deleting the association for subject '"
1455- + subject + "' in the backend Kafka store" , e );
1470+ + qualifiedSubject + "' in the backend Kafka store" , e );
14561471 }
14571472
1473+ Mode subjectMode = getModeInScope (qualifiedSubject );
1474+ if (subjectMode == Mode .IMPORT ) {
1475+ return ;
1476+ }
14581477 if (cascadeLifecycle && oldAssociation .getLifecycle () == LifecyclePolicy .STRONG ) {
14591478 // Delete subject
1460- deleteSubject (subject , false );
1461- deleteSubject (subject , true );
1479+ deleteSubject (qualifiedSubject , false );
1480+ deleteSubject (qualifiedSubject , true );
14621481 }
14631482 }
14641483
14651484 public void deleteAssociationsOrForward (
14661485 String subject , // subject is only used for locking per tenant
14671486 String resourceId , String resourceType , List <String > associationTypes ,
1468- boolean cascadeLifecycle , Map <String , String > headerProperties )
1487+ boolean cascadeLifecycle , boolean dryRun , Map <String , String > headerProperties )
14691488 throws SchemaRegistryException {
14701489 kafkaStore .lockFor (subject ).lock ();
14711490 try {
14721491 if (isLeader ()) {
1473- deleteAssociations (resourceId , resourceType , associationTypes , cascadeLifecycle );
1492+ deleteAssociations (resourceId , resourceType , associationTypes , cascadeLifecycle , dryRun );
14741493 } else {
14751494 // forward update config request to the leader
14761495 if (leaderIdentity != null ) {
14771496 forwardDeleteAssociationsRequestToLeader (resourceId ,
1478- resourceType , associationTypes , cascadeLifecycle , headerProperties );
1497+ resourceType , associationTypes , cascadeLifecycle , dryRun , headerProperties );
14791498 } else {
14801499 throw new UnknownLeaderException ("Delete association request failed since leader is "
14811500 + "unknown" );
@@ -1753,14 +1772,14 @@ private AssociationBatchResponse forwardCreateOrUpdateAssociationsRequestToLeade
17531772
17541773 private void forwardDeleteAssociationsRequestToLeader (
17551774 String resourceId , String resourceType , List <String > associationTypes ,
1756- boolean cascadeLifecycle , Map <String , String > headerProperties )
1775+ boolean cascadeLifecycle , boolean dryRun , Map <String , String > headerProperties )
17571776 throws SchemaRegistryRequestForwardingException {
17581777 final UrlList baseUrl = leaderRestService .getBaseUrls ();
17591778
17601779 log .debug (String .format ("Forwarding delete associations request to %s" , baseUrl ));
17611780 try {
17621781 leaderRestService .deleteAssociations (
1763- headerProperties , resourceId , resourceType , associationTypes , cascadeLifecycle );
1782+ headerProperties , resourceId , resourceType , associationTypes , cascadeLifecycle , dryRun );
17641783 } catch (IOException e ) {
17651784 throw new SchemaRegistryRequestForwardingException (
17661785 String .format ("Unexpected error while forwarding the delete association request to %s" ,
0 commit comments