Skip to content

Experimental solution for HREACT-34 #1683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.hibernate.reactive.persister.entity.mutation.ReactiveInsertCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorNoOp;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorStandard;

public final class ReactiveCoordinatorFactory {

Expand All @@ -32,7 +31,7 @@ public static ReactiveUpdateCoordinator buildUpdateCoordinator(
for ( int i = 0; i < attributeMappings.size(); i++ ) {
AttributeMapping attributeMapping = attributeMappings.get( i );
if ( attributeMapping instanceof SingularAttributeMapping ) {
return new ReactiveUpdateCoordinatorStandard( entityPersister, factory );
return new ReactiveUpdateCoordinatorStandardScopeFactory( entityPersister, factory );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public CompletionStage<Void> updateReactive(
Object rowId,
SharedSessionContractImplementor session) {
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
.makeScopedCoordinator()
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ public CompletionStage<Void> updateReactive(
final Object rowId,
final SharedSessionContractImplementor session) throws HibernateException {
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
.makeScopedCoordinator()
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public CompletionStage<Void> updateReactive(
Object rowId,
SharedSessionContractImplementor session) {
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
.makeScopedCoordinator()
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.persister.entity.impl;

import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.persister.entity.AbstractEntityPersister;
import org.hibernate.persister.entity.mutation.UpdateCoordinatorStandard;
import org.hibernate.reactive.persister.entity.mutation.ReactiveScopedUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorStandard;

public class ReactiveUpdateCoordinatorStandardScopeFactory extends UpdateCoordinatorStandard implements ReactiveUpdateCoordinator {

private final AbstractEntityPersister entityPersister;
private final SessionFactoryImplementor factory;

public ReactiveUpdateCoordinatorStandardScopeFactory(
AbstractEntityPersister entityPersister,
SessionFactoryImplementor factory) {
super( entityPersister, factory );
this.entityPersister = entityPersister;
this.factory = factory;
}

@Override
public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
return new ReactiveUpdateCoordinatorStandard( entityPersister, factory );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.persister.entity.mutation;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;

/**
* Scoped to a single operation, so that we can keep
* instance scoped state.
*/
public interface ReactiveScopedUpdateCoordinator {

CompletionStage<Void> coordinateReactiveUpdate(
Object entity,
Object id,
Object rowId,
Object[] values,
Object oldVersion,
Object[] incomingOldValues,
int[] dirtyAttributeIndexes,
boolean hasDirtyCollection,
SharedSessionContractImplementor session);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,10 @@
*/
package org.hibernate.reactive.persister.entity.mutation;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.persister.entity.mutation.UpdateCoordinator;

public interface ReactiveUpdateCoordinator extends UpdateCoordinator {

CompletionStage<Void> coordinateReactiveUpdate(
Object entity,
Object id,
Object rowId,
Object[] values,
Object oldVersion,
Object[] incomingOldValues,
int[] dirtyAttributeIndexes,
boolean hasDirtyCollection,
SharedSessionContractImplementor session);
ReactiveScopedUpdateCoordinator makeScopedCoordinator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveUpdateCoordinatorNoOp extends UpdateCoordinatorNoOp implements ReactiveUpdateCoordinator {
public class ReactiveUpdateCoordinatorNoOp extends UpdateCoordinatorNoOp implements ReactiveScopedUpdateCoordinator, ReactiveUpdateCoordinator {

public ReactiveUpdateCoordinatorNoOp(AbstractEntityPersister entityPersister) {
super( entityPersister );
Expand Down Expand Up @@ -45,4 +45,11 @@ public CompletionStage<Void> coordinateReactiveUpdate(
SharedSessionContractImplementor session) {
return voidFuture();
}

@Override
public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
//This particular implementation is stateless, so we can return ourselves w/o needing to create a scope.
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.concurrent.CompletionStage;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.hibernate.engine.jdbc.mutation.ParameterUsage;
import org.hibernate.engine.jdbc.mutation.spi.MutationExecutorService;
import org.hibernate.engine.spi.EntityEntry;
Expand All @@ -35,24 +37,27 @@
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveUpdateCoordinatorStandard extends UpdateCoordinatorStandard implements ReactiveUpdateCoordinator {

private CompletionStage<Void> stage;
public class ReactiveUpdateCoordinatorStandard extends UpdateCoordinatorStandard implements ReactiveScopedUpdateCoordinator {

private final AtomicReference<CompletionStage<Void>> currentTask = new AtomicReference<>();

public ReactiveUpdateCoordinatorStandard(AbstractEntityPersister entityPersister, SessionFactoryImplementor factory) {
super( entityPersister, factory );
}

private void complete(Object o, Throwable throwable) {
if ( throwable != null ) {
stage.toCompletableFuture().completeExceptionally( throwable );
getCurrentTask().toCompletableFuture().completeExceptionally( throwable );
}
else {
stage.toCompletableFuture().complete( null );
getCurrentTask().toCompletableFuture().complete( null );
}
}

private CompletionStage<Void> getCurrentTask() {
return currentTask.get();
}

@Override
public CompletionStage<Void> coordinateReactiveUpdate(
Object entity,
Expand Down Expand Up @@ -143,9 +148,10 @@ && entityPersister().hasLazyDirtyFields( dirtyAttributeIndexes ) ) {
attributeUpdateability,
forceDynamicUpdate
);

final CompletionStage<Void> stage = getCurrentTask();
// stage gets updated by doDynamicUpdate and doStaticUpdate which get called by performUpdate
return stage != null ? stage : voidFuture();
final CompletionStage<Void> finalStage = stage != null ? stage : voidFuture();
return finalStage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this finalStage necessary, or it's you being overzealous?

});
}

Expand Down Expand Up @@ -202,7 +208,7 @@ protected void doVersionUpdate(
Object oldVersion,
SharedSessionContractImplementor session) {
assert getVersionUpdateGroup() != null;
this.stage = new CompletableFuture<>();
setNextTask();

final EntityTableMapping mutatingTableDetails = (EntityTableMapping) getVersionUpdateGroup()
.getSingleOperation().getTableDetails();
Expand Down Expand Up @@ -255,6 +261,13 @@ protected void doVersionUpdate(
.whenComplete( this::complete );
}

private void setNextTask() {
final boolean ok = this.currentTask.compareAndSet( null, new CompletableFuture<>() );
if ( !ok ) {
throw new IllegalStateException("Race condition in task initialization");
}
}

private ReactiveMutationExecutor mutationExecutor(
SharedSessionContractImplementor session,
MutationOperationGroup operationGroup) {
Expand All @@ -274,7 +287,7 @@ protected void doDynamicUpdate(
UpdateCoordinatorStandard.InclusionChecker dirtinessChecker,
UpdateCoordinatorStandard.UpdateValuesAnalysisImpl valuesAnalysis,
SharedSessionContractImplementor session) {
this.stage = new CompletableFuture<>();
setNextTask();
// Create the JDBC operation descriptors
final MutationOperationGroup dynamicUpdateGroup = generateDynamicUpdateGroup(
id,
Expand Down Expand Up @@ -342,7 +355,7 @@ protected void doStaticUpdate(
Object[] oldValues,
UpdateValuesAnalysisImpl valuesAnalysis,
SharedSessionContractImplementor session) {
this.stage = new CompletableFuture<>();
setNextTask();
final MutationOperationGroup staticUpdateGroup = getStaticUpdateGroup();
final ReactiveMutationExecutor mutationExecutor = mutationExecutor( session, staticUpdateGroup );

Expand Down