Skip to content
16 changes: 13 additions & 3 deletions interceptors/LogFailedJobsInterceptor.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ component {
"null" : ( arguments.data.exception.type ?: "" ) == "",
"nulls" : ( arguments.data.exception.type ?: "" ) == ""
},
"exceptionMessage" : arguments.data.exception.message,
"exceptionMessage" : {
"value": arguments.data.exception.message ?: "",
"cfsqltype" : "CF_SQL_VARCHAR",
"null" : ( arguments.data.exception.message ?: "" ) == "",
"nulls" : ( arguments.data.exception.message ?: "" ) == ""
},
"exceptionDetail" : {
"value" : arguments.data.exception.detail ?: "",
"cfsqltype" : "CF_SQL_VARCHAR",
Expand All @@ -57,8 +62,13 @@ component {
"null" : ( arguments.data.exception.extendedInfo ?: "" ) == "",
"nulls" : ( arguments.data.exception.extendedInfo ?: "" ) == ""
},
"exceptionStackTrace" : arguments.data.exception.stackTrace,
"exception" : serializeJSON( arguments.data.exception ),
"exceptionStackTrace" : {
"value": arguments.data.exception.stackTrace ?: "",
"cfsqltype" : "CF_SQL_VARCHAR",
"null" : ( arguments.data.exception.stackTrace ?: "" ) == "",
"nulls" : ( arguments.data.exception.stackTrace ?: "" ) == ""
},
"exception" : isNull( arguments.data.exception ) ? javacast( "null", "" ) : serializeJSON( arguments.data.exception ),
"failedDate" : { "value": getCurrentUnixTimestamp(), "cfsqltype": "CF_SQL_BIGINT" },
"originalId" : { "value": arguments.data.job.getId(), "cfsqltype": "CF_SQL_VARCHAR" }
};
Expand Down
1 change: 1 addition & 0 deletions models/Jobs/Batch.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ component accessors="true" {
property name="totalJobs" type="numeric";
property name="pendingJobs" type="numeric";
property name="failedJobs" type="numeric";
property name="successfulJobs" type="numeric";
property name="failedJobIds" type="array";
property name="options" type="struct";
property name="createdDate" type="numeric";
Expand Down
46 changes: 24 additions & 22 deletions models/Jobs/DBBatchRepository.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,19 @@ component singleton accessors="true" {

public Batch function store( required PendingBatch batch ) {
var id = variables.timeBasedUUIDGenerator.generate().toString();
var batchName = arguments.batch.getName();
if ( isNull( batchName ) || !isSimpleValue( batchName ) ) {
batchName = "";
}

qb.table( variables.batchTableName )
.insert(
values = {
"id" : id,
"name" : arguments.batch.getName(),
"name" : batchName,
"totalJobs" : 0,
"pendingJobs" : 0,
"successfulJobs" : 0,
"failedJobs" : 0,
"failedJobIds" : "[]",
"options" : serializeJSON( arguments.batch.getOptions() ),
Expand Down Expand Up @@ -102,23 +107,20 @@ component singleton accessors="true" {
throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" );
}

var updatedValues = {
"pendingJobs" : data.pendingJobs - 1,
"successfulJobs" : data.successfulJobs + 1,
"failedJobs" : data.failedJobs
};

qb.table( variables.batchTableName )
.where( "id", arguments.batchId )
.update(
values = {
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs,
"failedJobIds" : serializeJSON(
deserializeJSON( data.failedJobIds ).filter( ( failedJobId ) => failedJobId != jobId )
)
},
options = variables.defaultQueryOptions
);
.update( values = updatedValues, options = variables.defaultQueryOptions );

return {
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs,
"allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) - data.failedJobs == 0
"allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0
};
}
}
Expand All @@ -135,21 +137,20 @@ component singleton accessors="true" {
throw( type = "cbq.BatchNotFound", message = "No batch found for id [#arguments.batchId#]" );
}

var updatedValues = {
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs + 1,
"failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) )
};

qb.table( variables.batchTableName )
.where( "id", arguments.batchId )
.update(
values = {
"pendingJobs" : data.pendingJobs,
"failedJobs" : data.failedJobs + 1,
"failedJobIds" : serializeJSON( deserializeJSON( data.failedJobIds ).append( arguments.jobId ) )
},
options = variables.defaultQueryOptions
);
.update( values = updatedValues, options = variables.defaultQueryOptions );

return {
"pendingJobs" : data.pendingJobs,
"pendingJobs" : data.pendingJobs - 1,
"failedJobs" : data.failedJobs + 1,
"allJobsHaveRanExactlyOnce" : data.pendingJobs - ( data.failedJobs + 1 ) == 0
"allJobsHaveRanExactlyOnce" : ( data.pendingJobs - 1 ) == 0
};
}
}
Expand Down Expand Up @@ -191,6 +192,7 @@ component singleton accessors="true" {
batch.setTotalJobs( data.totalJobs );
batch.setPendingJobs( data.pendingJobs );
batch.setFailedJobs( data.failedJobs );
batch.setSuccessfulJobs( data.successfulJobs );
batch.setFailedJobIds( deserializeJSON( data.failedJobIds ) );
batch.setOptions( deserializeJSON( data.options ) );
batch.setCreatedDate( data.createdDate );
Expand Down
2 changes: 1 addition & 1 deletion models/Providers/ColdBoxAsyncProvider.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ component accessors="true" extends="AbstractQueueProvider" {
sleep( delay * 1000 );
return true;
}, workerPool.getExecutor() )
.then( function() {
.thenCompose( function() {
job.setId( createUUID() );
if ( !isNull( arguments.currentAttempt ) ) {
job.setCurrentAttempt( attempts );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
component {

function up( schema ) {
schema.alter( "cbq_batches", ( t ) => {
t.addColumn( t.unsignedInteger( "successfulJobs" ).default( 0 ) );
} );
}

function down( schema ) {
schema.alter( "cbq_batches", ( t ) => {
t.dropColumn( "successfulJobs" );
} );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
component {

function up( schema, qb ) {
schema.alter( "cbq_batches", ( t ) => {
t.modifyColumn( "name", t.string( "name" ).nullable() );
} );
}

function down( schema, qb ) {
schema.alter( "cbq_batches", ( t ) => {
t.modifyColumn( "name", t.string( "name" ) );
} );
}

}
4 changes: 4 additions & 0 deletions tests/Application.cfc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ component {
this.mappings[ "/testbox" ] = rootPath & "/testbox";

this.datasource = "cbq";
this.javaSettings = {
"loadPaths" : [ rootPath & "/lib" ],
"reloadOnChange" : false
};

function onRequestStart() {
createObject( "java", "java.lang.System" ).setProperty( "ENVIRONMENT", "testing" );
Expand Down
10 changes: 10 additions & 0 deletions tests/resources/app/models/Jobs/AlwaysErrorJob.cfc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
component extends="cbq.models.Jobs.AbstractJob" {

function handle() {
throw(
type = "cbq.tests.AlwaysErrorJob",
message = "This job always errors for testing."
);
}

}
15 changes: 15 additions & 0 deletions tests/resources/app/models/Jobs/RequestScopeBeforeAndAfterJob.cfc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
component extends="cbq.models.Jobs.AbstractJob" {

function handle() {
// do nothing
}

function before() {
request.jobBeforeCalled = true;
}

function after() {
request.jobAfterCalled = true;
}

}
72 changes: 72 additions & 0 deletions tests/specs/integration/BatchFinallySpec.cfc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {

function run() {
describe( "batch finally dispatching", function() {
beforeEach( function() {
structDelete( request, "jobBeforeCalled" );
structDelete( request, "jobAfterCalled" );

param request.jobBeforeCalled = false;
param request.jobAfterCalled = false;
} );

it( "dispatches the finally job when the last job fails", function() {
var cbq = getWireBox().getInstance( "@cbq" );
registerSyncConnectionAndWorkerPool();

var successJob = cbq.job( "SendWelcomeEmailJob" );
var failingJob = cbq.job( job = "ReleaseTestJob", maxAttempts = 1 );

var pendingBatch = cbq
.batch( [ successJob, failingJob ] )
.onConnection( "syncBatch" )
.onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" );

try {
pendingBatch.dispatch();
} catch ( cbq.MaxAttemptsReached e ) {
// The sync provider rethrows the terminal failure.
}

expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch even when the last job fails." );
} );

it( "dispatches the finally job when all jobs succeed", function() {
var cbq = getWireBox().getInstance( "@cbq" );
registerSyncConnectionAndWorkerPool();

var pendingBatch = cbq
.batch( [
cbq.job( "SendWelcomeEmailJob" ),
cbq.job( "SendWelcomeEmailJob" )
] )
.onConnection( "syncBatch" )
.onComplete( job = "RequestScopeBeforeAndAfterJob", connection = "syncBatch" );

pendingBatch.dispatch();

expect( request.jobAfterCalled ).toBeTrue( "The `finally` job should dispatch when all batch jobs succeed." );
} );
} );
}

private void function registerSyncConnectionAndWorkerPool() {
var config = getWireBox().getInstance( "Config@cbq" );

if ( !config.getConnections().keyExists( "syncBatch" ) ) {
config.registerConnection(
name = "syncBatch",
provider = getWireBox().getInstance( "SyncProvider@cbq" ).setProperties( {} )
);
}

if ( !config.getWorkerPools().keyExists( "syncBatch" ) ) {
config.registerWorkerPool(
name = "syncBatch",
connectionName = "syncBatch",
maxAttempts = 1
);
}
}

}
Loading