Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.flamingock.internal.core.builder.FlamingockFactory;
import io.flamingock.internal.common.cloud.audit.AuditEntryRequest;
import io.flamingock.internal.core.external.store.lock.LockException;
import io.flamingock.internal.core.runner.Runner;
import io.flamingock.internal.core.builder.runner.Runner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.flamingock.internal.common.cloud.audit.AuditEntryRequest;
import io.flamingock.internal.common.cloud.vo.TargetSystemAuditMarkType;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
import io.flamingock.internal.core.runner.Runner;
import io.flamingock.internal.core.builder.runner.Runner;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.flamingock.internal.common.couchbase.CouchbaseCollectionHelper;
import io.flamingock.internal.core.builder.FlamingockFactory;
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
import io.flamingock.internal.core.runner.PipelineExecutionException;
import io.flamingock.internal.core.operation.OperationException;
import io.flamingock.internal.util.Trio;
import org.junit.jupiter.api.*;
import org.mockito.MockedStatic;
Expand Down Expand Up @@ -149,7 +149,7 @@ void failedWithRollback() {
new Trio<>(io.flamingock.store.couchbase.changes.failedWithRollback._003__execution_with_exception.class, Collections.singletonList(Collection.class), Collections.singletonList(Collection.class)))
);

assertThrows(PipelineExecutionException.class, () -> {
assertThrows(OperationException.class, () -> {
FlamingockFactory.getCommunityBuilder()
.setAuditStore(CouchbaseAuditStore.from(couchbaseTargetSystem))
.addTargetSystem(couchbaseTargetSystem)
Expand Down Expand Up @@ -203,7 +203,7 @@ void failedWithoutRollback() {
new Trio<>(_003__execution_with_exception.class, Collections.singletonList(Collection.class)))
);

assertThrows(PipelineExecutionException.class, () -> {
assertThrows(OperationException.class, () -> {
FlamingockFactory.getCommunityBuilder()
.setAuditStore(CouchbaseAuditStore.from(couchbaseTargetSystem))
.addTargetSystem(couchbaseTargetSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.flamingock.internal.common.core.audit.AuditEntry;
import io.flamingock.internal.core.builder.FlamingockFactory;
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
import io.flamingock.internal.core.runner.PipelineExecutionException;
import io.flamingock.internal.core.operation.OperationException;
import io.flamingock.internal.util.dynamodb.DynamoDBConstants;
import io.flamingock.internal.util.dynamodb.DynamoDBUtil;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -176,7 +176,7 @@ void failedWithTransaction() {
new CodeChangeTestDefinition(_002__insert_federico_happy_non_transactional.class, Collections.singletonList(DynamoDbClient.class)),
new CodeChangeTestDefinition(_003__insert_jorge_failed_transactional_non_rollback.class, Arrays.asList(DynamoDbClient.class, TransactWriteItemsEnhancedRequest.Builder.class)))
.WHEN(() -> {
assertThrows(PipelineExecutionException.class, () -> {
assertThrows(OperationException.class, () -> {
FlamingockFactory.getCommunityBuilder()
.setAuditStore(DynamoDBAuditStore.from(dynamoDBTargetSystem))
.addTargetSystem(dynamoDBTargetSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.flamingock.core.kit.TestKit;
import io.flamingock.core.kit.audit.AuditTestHelper;
import io.flamingock.core.kit.audit.AuditTestSupport;
import io.flamingock.internal.core.runner.PipelineExecutionException;
import io.flamingock.internal.core.operation.OperationException;
import io.flamingock.mongodb.kit.MongoDBSyncTestKit;
import io.flamingock.targetsystem.mongodb.sync.MongoDBSyncTargetSystem;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -177,7 +177,7 @@ void failedWithTransaction() {
new CodeChangeTestDefinition(_002__insert_federico_happy_non_transactional.class, Collections.singletonList(MongoDatabase.class)),
new CodeChangeTestDefinition(_003__insert_jorge_failed_transactional_non_rollback.class, Arrays.asList(MongoDatabase.class, ClientSession.class))
)
.WHEN(() -> assertThrows(PipelineExecutionException.class, () -> {
.WHEN(() -> assertThrows(OperationException.class, () -> {
testKit.createBuilder()
.setAuditStore(MongoDBSyncAuditStore.from(mongoDBSyncTargetSystem))
.addTargetSystem(mongoDBSyncTargetSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.flamingock.mongodb.kit.MongoDBSyncTestKit;
import io.flamingock.core.kit.TestKit;
import io.flamingock.core.kit.audit.AuditTestHelper;
import io.flamingock.internal.core.runner.PipelineExecutionException;
import io.flamingock.internal.core.operation.OperationException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -180,7 +180,7 @@ void testFailingTransactionalChangeWithRollback() {
new CodeChangeTestDefinition(_001__create_client_collection_happy.class, Collections.emptyList()),
new CodeChangeTestDefinition(_003__insert_jorge_failed_non_transactional_rollback.class, Collections.emptyList())
)
.WHEN(() -> assertThrows(PipelineExecutionException.class, () -> {
.WHEN(() -> assertThrows(OperationException.class, () -> {
testKit.createBuilder()
.setAuditStore(MongoDBSyncAuditStore.from(mongoDBSyncTargetSystem))
.addTargetSystem(mongoDBSyncTargetSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.flamingock.internal.common.core.util.Deserializer;
import io.flamingock.internal.common.sql.SqlDialect;
import io.flamingock.internal.core.builder.FlamingockFactory;
import io.flamingock.internal.core.runner.PipelineExecutionException;
import io.flamingock.internal.core.operation.OperationException;
import io.flamingock.internal.util.Trio;
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
import io.flamingock.store.sql.changes.postgresql.failedWithoutRollback._001__create_index;
Expand Down Expand Up @@ -369,7 +369,7 @@ void failedWithRollback(SqlDialect sqlDialect, String dialectName) throws Except
SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource);
SqlAuditStore auditStore = SqlAuditStore.from(targetSystem);

assertThrows(PipelineExecutionException.class, () -> {
assertThrows(OperationException.class, () -> {
FlamingockFactory.getCommunityBuilder()
.setAuditStore(auditStore)
.addTargetSystem(targetSystem)
Expand Down Expand Up @@ -456,7 +456,7 @@ void failedWithoutRollback(SqlDialect sqlDialect, String dialectName) throws Exc
SqlTargetSystem targetSystem = new SqlTargetSystem("sql", context.dataSource);
SqlAuditStore auditStore = SqlAuditStore.from(targetSystem);

assertThrows(PipelineExecutionException.class, () -> {
assertThrows(OperationException.class, () -> {
FlamingockFactory.getCommunityBuilder()
.setAuditStore(auditStore)
.addTargetSystem(targetSystem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.flamingock.internal.core.context.SimpleContext;
import io.flamingock.internal.core.external.store.AuditStore;
import io.flamingock.internal.core.external.store.audit.AuditPersistence;
import io.flamingock.internal.core.operation.OperationType;
import io.flamingock.internal.core.plan.ExecutionPlanner;
import io.flamingock.internal.core.event.CompositeEventPublisher;
import io.flamingock.internal.core.event.EventPublisher;
Expand All @@ -43,9 +44,9 @@
import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline;
import io.flamingock.internal.core.plugin.Plugin;
import io.flamingock.internal.core.plugin.PluginManager;
import io.flamingock.internal.core.runner.PipelineRunnerCreator;
import io.flamingock.internal.core.runner.Runner;
import io.flamingock.internal.core.runner.RunnerBuilder;
import io.flamingock.internal.core.builder.runner.RunnerFactory;
import io.flamingock.internal.core.builder.runner.Runner;
import io.flamingock.internal.core.builder.runner.RunnerBuilder;
import io.flamingock.internal.core.task.filter.TaskFilter;
import io.flamingock.internal.util.CollectionUtil;
import io.flamingock.internal.util.Property;
Expand Down Expand Up @@ -195,11 +196,10 @@ public final Runner build() {
LoadedPipeline pipeline = loadPipeline();
pipeline.contributeToContext(hierarchicalContext);




return PipelineRunnerCreator.create(
OperationType operation = OperationType.EXECUTE;
return RunnerFactory.getRunner(
runnerId,
operation,
pipeline,
persistence,
buildExecutionPlanner(runnerId),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.builder.runner;

import io.flamingock.internal.core.operation.AbstractOperationResult;
import io.flamingock.internal.core.operation.Operation;
import io.flamingock.internal.util.id.RunnerId;
import io.flamingock.internal.util.log.FlamingockLoggerFactory;
import org.slf4j.Logger;

public class DefaultRunner implements Runner {
private static final Logger logger = FlamingockLoggerFactory.getLogger("PipelineRunner");

private final RunnerId runnerId;

private final Runnable finalizer;
private final Operation<?> operation;

public DefaultRunner(RunnerId runnerId,
Runnable finalizer,
Operation<?> operation) {
this.runnerId = runnerId;
this.operation = operation;
this.finalizer = finalizer;
}

@Override
public void run() {
try {
AbstractOperationResult result = operation.execute();
} finally {
finalizer.run();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.runner;
package io.flamingock.internal.core.builder.runner;

public interface Runner extends Runnable {
void run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.runner;
package io.flamingock.internal.core.builder.runner;

public interface RunnerBuilder {
Runner build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2023 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.builder.runner;

import io.flamingock.internal.common.core.context.ContextResolver;
import io.flamingock.internal.core.configuration.core.CoreConfigurable;
import io.flamingock.internal.core.external.store.audit.AuditPersistence;
import io.flamingock.internal.core.operation.OperationType;
import io.flamingock.internal.core.plan.ExecutionPlanner;
import io.flamingock.internal.core.event.EventPublisher;
import io.flamingock.internal.core.pipeline.execution.OrphanExecutionContext;
import io.flamingock.internal.core.pipeline.execution.StageExecutor;
import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline;
import io.flamingock.internal.core.external.targets.TargetSystemManager;
import io.flamingock.internal.core.operation.apply.ExecuteOperation;
import io.flamingock.internal.util.StringUtil;
import io.flamingock.internal.util.id.RunnerId;
import org.jetbrains.annotations.NotNull;

import java.util.Set;

public final class RunnerFactory {

private RunnerFactory() {
}

public static Runner getRunner(RunnerId runnerId,
OperationType operationType,
LoadedPipeline pipeline,
AuditPersistence persistence,
ExecutionPlanner executionPlanner,
TargetSystemManager targetSystemManager,
CoreConfigurable coreConfiguration,
EventPublisher eventPublisher,
ContextResolver dependencyContext,
Set<Class<?>> nonGuardedTypes,
boolean isThrowExceptionIfCannotObtainLock,
Runnable finalizer) {
switch (operationType) {
case EXECUTE:
return getExecuteRunner(runnerId, pipeline, persistence, executionPlanner, targetSystemManager, coreConfiguration, eventPublisher, dependencyContext, nonGuardedTypes, isThrowExceptionIfCannotObtainLock, finalizer);
default:
throw new UnsupportedOperationException(String.format("Operation %s not supported", operationType));
}


}

@NotNull
private static DefaultRunner getExecuteRunner(RunnerId runnerId, LoadedPipeline pipeline, AuditPersistence persistence, ExecutionPlanner executionPlanner, TargetSystemManager targetSystemManager, CoreConfigurable coreConfiguration, EventPublisher eventPublisher, ContextResolver dependencyContext, Set<Class<?>> nonGuardedTypes, boolean isThrowExceptionIfCannotObtainLock, Runnable finalizer) {
final StageExecutor stageExecutor = new StageExecutor(dependencyContext, nonGuardedTypes, persistence, targetSystemManager, null);
ExecuteOperation applyOperation = new ExecuteOperation(
runnerId,
pipeline,
executionPlanner,
stageExecutor,
buildExecutionContext(coreConfiguration),
eventPublisher,
isThrowExceptionIfCannotObtainLock,
finalizer);
return new DefaultRunner(runnerId, finalizer, applyOperation);
}


private static OrphanExecutionContext buildExecutionContext(CoreConfigurable configuration) {
return new OrphanExecutionContext(StringUtil.hostname(), configuration.getMetadata());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.operation;

public abstract class AbstractOperationResult {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2026 Flamingock (https://www.flamingock.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.operation;

public interface Operation<R extends AbstractOperationResult> {

R execute();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.flamingock.internal.core.runner;
package io.flamingock.internal.core.operation;

import io.flamingock.internal.common.core.error.FlamingockException;

public class PipelineExecutionException extends FlamingockException {
public class OperationException extends FlamingockException {

public static PipelineExecutionException fromExisting(Throwable exception, PipelineSummary summary) {
public static OperationException fromExisting(Throwable exception, OperationSummary summary) {
Throwable cause = exception.getCause();
return (exception instanceof FlamingockException) && cause != null
? new PipelineExecutionException(cause, summary)
: new PipelineExecutionException(exception, summary);
? new OperationException(cause, summary)
: new OperationException(exception, summary);
}

private final PipelineSummary summary;
private final OperationSummary summary;


private PipelineExecutionException(Throwable throwable, PipelineSummary summary) {
private OperationException(Throwable throwable, OperationSummary summary) {
super(throwable);
this.summary = summary;
}

public PipelineSummary getSummary() {
public OperationSummary getSummary() {
return summary;
}

Expand Down
Loading
Loading