Skip to content

Commit 0ed6423

Browse files
committed
Add the ability to process each Bulk Batch with a processor class
1 parent c84e57c commit 0ed6423

File tree

6 files changed

+93
-2
lines changed

6 files changed

+93
-2
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
},
3636
"autoload-dev": {
3737
"files": [
38-
"tests/GuzzleServer.php"
38+
"tests/GuzzleServer.php",
39+
"tests/BulkBatchProcessor.php"
3940
]
4041
}
4142
}

readme.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,22 @@ if ($result->id) {
377377
}
378378
```
379379

380+
### Query with Class to Process each Batch
381+
382+
```php
383+
$operationType = 'query';
384+
$objectType = 'Account';
385+
$objectData = 'SELECT Id, Name FROM Account LIMIT 10';
386+
387+
$result = Salesforce::bulk()->runBatch($operationType, $objectType, $objectData,[
388+
'batchProcessor' => CustomBulkBatchProcessor::class, //Class must implement Frankkessler\Salesforce\Interfaces\BulkBatchProcessorInterface
389+
]);
390+
391+
if ($result->id) {
392+
$id = $result->id;
393+
}
394+
```
395+
380396
### Custom REST Endpoint (GET)
381397

382398
```php

src/Bulk.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Frankkessler\Salesforce\Client\BulkClient;
66
use Frankkessler\Salesforce\DataObjects\BinaryBatch;
7+
use Frankkessler\Salesforce\Interfaces\BulkBatchProcessorInterface;
78
use Frankkessler\Salesforce\Responses\Bulk\BulkBatchResponse;
89
use Frankkessler\Salesforce\Responses\Bulk\BulkBatchResultResponse;
910
use Frankkessler\Salesforce\Responses\Bulk\BulkJobResponse;
@@ -45,6 +46,7 @@ public function runBatch($operation, $objectType, $data, $options = [])
4546
'isBatchedResult' => false,
4647
'concurrencyMode' => 'Parallel',
4748
'Sforce-Enable-PKChunking' => false,
49+
'batchProcessor' => null,
4850
];
4951

5052
$options = array_replace($defaults, $options);
@@ -91,7 +93,11 @@ public function runBatch($operation, $objectType, $data, $options = [])
9193
if (in_array($batch->state, ['Completed', 'Failed', 'Not Processed', 'NotProcessed'])) {
9294
if (in_array($batch->state, ['Completed'])) {
9395
$batchResult = $this->batchResult($job->id, $batch->id, $options['isBatchedResult'], null, $options['contentType']);
94-
$batch->records = $batchResult->records;
96+
if($options['batchProcessor'] instanceof BulkBatchProcessorInterface){
97+
call_user_func([$options['batchProcessor'], 'process'], $batchResult);
98+
}else {
99+
$batch->records = $batchResult->records;
100+
}
95101
}
96102
$batches_finished[] = $batch->id;
97103
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
namespace Frankkessler\Salesforce\Interfaces;
4+
5+
use Frankkessler\Salesforce\Responses\Bulk\BulkBatchResultResponse;
6+
7+
interface BulkBatchProcessorInterface
8+
{
9+
public static function process(BulkBatchResultResponse $batchResult);
10+
}

tests/BulkBatchProcessor.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
use Frankkessler\Salesforce\Interfaces\BulkBatchProcessorInterface;
4+
use Frankkessler\Salesforce\Responses\Bulk\BulkBatchResultResponse;
5+
6+
class BulkBatchProcessor implements BulkBatchProcessorInterface
7+
{
8+
public static $records = [];
9+
10+
public static function process(BulkBatchResultResponse $batchResult)
11+
{
12+
array_merge(static::$records, $batchResult->records);
13+
}
14+
}

tests/BulkTest.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,50 @@ public function testRunPkChunkQueryBatch()
255255
}
256256
}
257257

258+
public function testRunPkChunkQueryBatchWithBatchProcessor()
259+
{
260+
// Create a mock and queue two responses.
261+
$mock = new MockHandler([
262+
new Response(200, [], json_encode($this->jobArray())),
263+
new Response(200, [], $this->batchXml()),
264+
new Response(200, [], $this->batchListXml()),
265+
new Response(200, [], $this->batchXml('NotProcessed')),
266+
new Response(200, [], $this->batchXml('Completed', '8914000000B5YcIAAV', '89040000006iLsVAAU')),
267+
new Response(200, [], $this->dataQueryResultXml()),
268+
new Response(200, [], $this->dataQueryDataResultCsv()),
269+
new Response(200, [], json_encode($this->jobArray())),
270+
]);
271+
272+
$handler = HandlerStack::create($mock);
273+
274+
$salesforce = new \Frankkessler\Salesforce\Salesforce([
275+
'handler' => $handler,
276+
'salesforce.oauth.access_token' => 'TEST',
277+
'salesforce.oauth.refresh_token' => 'TEST',
278+
]);
279+
280+
$jobId = '750D00000004SkVIAU';
281+
$batchId = '8914000000B5YcIAAV';
282+
$firstAccountId = '0014000001iM8r3AAC';
283+
284+
$operation = 'query';
285+
$objectType = 'Account';
286+
$query = "SELECT Id, Name FROM Account WHERE RecordTypeId='xxxxxxxxxxxxxx'";
287+
288+
$job = $salesforce->bulk()->runBatch($operation, $objectType, $query, [
289+
'contentType' => 'CSV',
290+
'Sforce-Enable-PKChunking' => [
291+
'chunkSize' => 2500,
292+
],
293+
'batchProcessor' => BulkBatchProcessor::class,
294+
]);
295+
296+
foreach(BulkBatchProcessor::$records as $record){
297+
$this->assertEquals($firstAccountId, $record['Id']);
298+
break;
299+
}
300+
}
301+
258302
public function testBulkBinaryJobCreateWithMiddleware()
259303
{
260304
copy(realpath('./tests/bulk_zip_files').'/zipped_dir.zip', realpath('./build').'/test.zip');

0 commit comments

Comments
 (0)