Skip to content

Commit

Permalink
Concurrent Transaction Conditonal Creates (#4639)
Browse files Browse the repository at this point in the history
* push broken test for testing placeholder create in transaction retry

* reproduce error for transaction retry

* cleanup test

* fix test testPlaceholderCreateTransactionRetry

* revert temp fix

* revert spacing

* Add tests

* Remove fixme

* Test fix

* test fix

* Test fix

* Remove test line

* Remove unneeded change

* Review comments

* Add docs

* Test logging

* Add test logging

* Add logging for intermittents

---------

Co-authored-by: aditya_dave <aditya@smilecdr.com>
Co-authored-by: James Agnew <jamesagnew@gmail.com>
  • Loading branch information
3 people authored Mar 14, 2023
1 parent ea8b68f commit 8956b27
Show file tree
Hide file tree
Showing 17 changed files with 254 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
type: fix
issue: 4639
title: "When executing FHIR transactions in the JPA server with automatic retry enabled, if
automatic placeholder-reference creation is enabled the system could sometimes fail to
automatically retry. This has been corrected."
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepos
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
// Avoid H2 synchronization issues caused by
// https://github.com/h2database/h2database/issues/1808
if (HapiSystemProperties.isUnitTestModeEnabled()) {
retVal = ProxyUtil.synchronizedProxy(IJobPersistence.class, retVal);
}
// TODO: Update 2023-03-14 - The bug above appears to be fixed. I'm going to try
// disabing this and see if we can get away without it. If so, we can delete
// this entirely
// if (HapiSystemProperties.isUnitTestModeEnabled()) {
// retVal = ProxyUtil.synchronizedProxy(IJobPersistence.class, retVal);
// }
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,15 @@ private DaoMethodOutcome doCreateForPostOrPut(RequestDetails theRequest, T theRe
return myTxTemplate.execute(tx -> {
IIdType retVal = myIdHelperService.translatePidIdToForcedId(myFhirContext, myResourceName, pid);
if (!retVal.hasVersionIdPart()) {
IIdType idWithVersion = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.RESOURCE_CONDITIONAL_CREATE_VERSION, pid.getId());
if (idWithVersion == null) {
Long version = myResourceTableDao.findCurrentVersionByPid(pid.getId());
Long version = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.RESOURCE_CONDITIONAL_CREATE_VERSION, pid.getId());
if (version == null) {
version = myResourceTableDao.findCurrentVersionByPid(pid.getId());
if (version != null) {
retVal = myFhirContext.getVersion().newIdType().setParts(retVal.getBaseUrl(), retVal.getResourceType(), retVal.getIdPart(), Long.toString(version));
myMemoryCacheService.putAfterCommit(MemoryCacheService.CacheEnum.RESOURCE_CONDITIONAL_CREATE_VERSION, pid.getId(), retVal);
myMemoryCacheService.putAfterCommit(MemoryCacheService.CacheEnum.RESOURCE_CONDITIONAL_CREATE_VERSION, pid.getId(), version);
}
} else {
retVal = idWithVersion;
}
if (version != null) {
retVal = myFhirContext.getVersion().newIdType().setParts(retVal.getBaseUrl(), retVal.getResourceType(), retVal.getIdPart(), Long.toString(version));
}
}
return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class ExtractInlineReferenceParams {
private IBaseResource myResource;
private TransactionDetails myTransactionDetails;
private RequestDetails myRequestDetails;
private boolean myFailOnInvalidReferences;

public ExtractInlineReferenceParams(
IBaseResource theResource,
Expand Down Expand Up @@ -64,11 +63,4 @@ public void setRequestDetails(RequestDetails theRequestDetails) {
myRequestDetails = theRequestDetails;
}

public boolean isFailOnInvalidReferences() {
return myFailOnInvalidReferences;
}

public void setFailOnInvalidReferences(boolean theFailOnInvalidReferences) {
myFailOnInvalidReferences = theFailOnInvalidReferences;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import ca.uhn.fhir.jpa.searchparam.extractor.SearchParamExtractorService;
import ca.uhn.fhir.jpa.util.MemoryCacheService;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.IResourcePersistentId;
import ca.uhn.fhir.rest.api.server.storage.NotFoundPid;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
Expand Down Expand Up @@ -112,12 +114,13 @@ public void setSearchParamRegistry(ISearchParamRegistry theSearchParamRegistry)
mySearchParamRegistry = theSearchParamRegistry;
}

public void populateFromResource(RequestPartitionId theRequestPartitionId, ResourceIndexedSearchParams theParams, TransactionDetails theTransactionDetails, ResourceTable theEntity, IBaseResource theResource, ResourceIndexedSearchParams theExistingParams, RequestDetails theRequest, boolean theFailOnInvalidReference) {
ExtractInlineReferenceParams theExtractParams = new ExtractInlineReferenceParams(theResource, theTransactionDetails, theRequest);
theExtractParams.setFailOnInvalidReferences(theFailOnInvalidReference);
extractInlineReferences(theExtractParams);
public void populateFromResource(RequestPartitionId theRequestPartitionId, ResourceIndexedSearchParams theParams, TransactionDetails theTransactionDetails, ResourceTable theEntity, IBaseResource theResource, ResourceIndexedSearchParams theExistingParams, RequestDetails theRequest, boolean thePerformIndexing) {
if (thePerformIndexing) {
ExtractInlineReferenceParams extractParams = new ExtractInlineReferenceParams(theResource, theTransactionDetails, theRequest);
extractInlineReferences(extractParams);
}

mySearchParamExtractorService.extractFromResource(theRequestPartitionId, theRequest, theParams, theExistingParams, theEntity, theResource, theTransactionDetails, theFailOnInvalidReference);
mySearchParamExtractorService.extractFromResource(theRequestPartitionId, theRequest, theParams, theExistingParams, theEntity, theResource, theTransactionDetails, thePerformIndexing);

ResourceSearchParams activeSearchParams = mySearchParamRegistry.getActiveSearchParams(theEntity.getResourceType());
if (myStorageSettings.getIndexMissingFields() == JpaStorageSettings.IndexEnabledEnum.ENABLED) {
Expand Down Expand Up @@ -242,8 +245,18 @@ public void extractInlineReferences(ExtractInlineReferenceParams theParams) {
}
Class<? extends IBaseResource> matchResourceType = matchResourceDef.getImplementingClass();

JpaPid resolvedMatch = null;
if (theTransactionDetails != null) {
resolvedMatch = (JpaPid) theTransactionDetails.getResolvedMatchUrls().get(nextIdText);
}

//Attempt to find the target reference before creating a placeholder
Set<JpaPid> matches = myMatchResourceUrlService.processMatchUrl(nextIdText, matchResourceType, theTransactionDetails, theRequest);
Set<JpaPid> matches;
if (resolvedMatch != null && !IResourcePersistentId.NOT_FOUND.equals(resolvedMatch)) {
matches = Set.of(resolvedMatch);
} else {
matches = myMatchResourceUrlService.processMatchUrl(nextIdText, matchResourceType, theTransactionDetails, theRequest);
}

JpaPid match;
if (matches.isEmpty()) {
Expand All @@ -268,6 +281,10 @@ public void extractInlineReferences(ExtractInlineReferenceParams theParams) {
IIdType newId = myIdHelperService.translatePidIdToForcedId(myContext, resourceTypeString, match);
ourLog.debug("Replacing inline match URL[{}] with ID[{}}", nextId.getValue(), newId);

if (theTransactionDetails != null) {
String previousReference = nextRef.getReferenceElement().getValue();
theTransactionDetails.addRollbackUndoAction(()->nextRef.setReference(previousReference));
}
nextRef.setReference(newId.getValue());
}
}
Expand Down Expand Up @@ -308,11 +325,7 @@ public void storeUniqueComboParameters(ResourceIndexedSearchParams theParams, Re
myEntityManager.persist(next);
haveNewStringUniqueParams = true;
}
if (theParams.myComboStringUniques.size() > 0 || haveNewStringUniqueParams) {
theEntity.setParamsComboStringUniquePresent(true);
} else {
theEntity.setParamsComboStringUniquePresent(false);
}
theEntity.setParamsComboStringUniquePresent(theParams.myComboStringUniques.size() > 0 || haveNewStringUniqueParams);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import ca.uhn.fhir.jpa.api.svc.IIdHelperService;
import ca.uhn.fhir.jpa.cache.IResourceVersionSvc;
import ca.uhn.fhir.jpa.dao.r4.TransactionProcessorVersionAdapterR4;
import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.dao.tx.NonTransactionalHapiTransactionService;
import ca.uhn.fhir.jpa.model.config.PartitionSettings;
import ca.uhn.fhir.jpa.model.entity.StorageSettings;
import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc;
import ca.uhn.fhir.jpa.searchparam.MatchUrlService;
import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryResourceMatcher;
Expand All @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Spy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -66,8 +67,6 @@ public class TransactionProcessorTest {
@MockBean
private MatchResourceUrlService myMatchResourceUrlService;
@MockBean
private HapiTransactionService myHapiTransactionService;
@MockBean
private InMemoryResourceMatcher myInMemoryResourceMatcher;
@MockBean
private IIdHelperService myIdHelperService;
Expand All @@ -88,13 +87,7 @@ public class TransactionProcessorTest {

@BeforeEach
public void before() {
when(myHapiTransactionService.execute(any(), any(), any())).thenAnswer(t -> {
TransactionCallback<?> callback = t.getArgument(2, TransactionCallback.class);
return callback.doInTransaction(mock(TransactionStatus.class));
});

myTransactionProcessor.setEntityManagerForUnitTest(myEntityManager);

when(myEntityManager.unwrap(eq(Session.class))).thenReturn(mySession);
}

Expand Down Expand Up @@ -156,5 +149,10 @@ public ITransactionProcessorVersionAdapter<Bundle, Bundle.BundleEntryComponent>
return new TransactionProcessorVersionAdapterR4();
}

@Bean
public IHapiTransactionService hapiTransactionService() {
return new NonTransactionalHapiTransactionService();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public class FhirResourceDaoR4ConcurrentWriteTest extends BaseJpaR4Test {
private TransactionConcurrencySemaphoreInterceptor myConcurrencySemaphoreInterceptor;


@Override
@BeforeEach
public void before() {
public void before() throws Exception {
super.before();
myExecutor = Executors.newFixedThreadPool(10);
myRetryInterceptor = new UserRequestRetryVersionConflictsInterceptor();
myConcurrencySemaphoreInterceptor = new TransactionConcurrencySemaphoreInterceptor(myMemoryCacheService);
Expand Down Expand Up @@ -132,6 +134,33 @@ public void testTransactionCreates_NoGuard() {
@Test
public void testTransactionCreates_WithRetry() throws ExecutionException, InterruptedException {
myInterceptorRegistry.registerInterceptor(myRetryInterceptor);
myStorageSettings.setUniqueIndexesEnabled(true);

// Create a unique search parameter to enfore uniqueness
// TODO: remove this once we have a better way to enfore these
SearchParameter sp = new SearchParameter();
sp.setId("SearchParameter/Practitioner-identifier");
sp.setType(Enumerations.SearchParamType.TOKEN);
sp.setCode("identifier");
sp.setExpression("Practitioner.identifier");
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Practitioner");
mySearchParameterDao.update(sp);

sp = new SearchParameter();
sp.setId("SearchParameter/Practitioner-identifier-unique");
sp.setType(Enumerations.SearchParamType.COMPOSITE);
sp.setStatus(Enumerations.PublicationStatus.ACTIVE);
sp.addBase("Practitioner");
sp.addComponent()
.setExpression("Practitioner")
.setDefinition("SearchParameter/Practitioner-identifier");
sp.addExtension()
.setUrl(HapiExtensions.EXT_SP_UNIQUE)
.setValue(new BooleanType(true));
mySearchParameterDao.update(sp);

mySearchParamRegistry.forceRefresh();

AtomicInteger setCounter = new AtomicInteger(0);
AtomicInteger fuzzCounter = new AtomicInteger(0);
Expand Down Expand Up @@ -162,9 +191,9 @@ public void testTransactionCreates_WithRetry() throws ExecutionException, Interr

assertEquals(1, counts.get("Patient"), counts.toString());
assertEquals(1, counts.get("Observation"), counts.toString());
assertEquals(6, myResourceLinkDao.count());
assertEquals(6, myResourceTableDao.count());
assertEquals(14, myResourceHistoryTableDao.count());
assertEquals(7, myResourceLinkDao.count()); // 1 for SP, 6 for transaction
assertEquals(8, myResourceTableDao.count()); // 2 SPs, 6 resources
assertEquals(16, myResourceHistoryTableDao.count());
});

}
Expand Down
Loading

0 comments on commit 8956b27

Please sign in to comment.