Skip to content

Commit f2946cd

Browse files
authored
Merge pull request #6 from level23/feature-add-checks
Add checks for deleting and releasing jobs
2 parents af34b64 + 5a5356d commit f2946cd

File tree

2 files changed

+70
-22
lines changed

2 files changed

+70
-22
lines changed

src/Queue/Jobs/BatchJob.php

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace Level23\AwsQueue\Queue\Jobs;
44

5+
use Illuminate\Support\Collection;
6+
57
class BatchJob extends SqsJob
68
{
79
/**
@@ -38,58 +40,83 @@ public function attempts()
3840
/**
3941
* Delete the job from the queue.
4042
*
41-
* @return void
43+
* @return Collection
4244
*/
4345
public function delete()
4446
{
45-
$entries = collect($this->jobs)
46-
->filter(function(SqsJob $job) {
47-
return !$job->isDeletedOrReleased();
48-
})
49-
->transform(function(SqsJob $job) {
50-
return [
51-
'Id' => $job->getJobId(),
52-
'ReceiptHandle' => $job->getReceiptHandle()
53-
];
54-
});
55-
56-
if($entries->isNotEmpty()) {
47+
$jobs = collect($this->jobs)->filter(function (SqsJob $job) {
48+
return !$job->isDeletedOrReleased();
49+
});
50+
51+
$response = $this->deleteJobs($jobs);
52+
53+
$this->deleted = true;
54+
55+
return $response;
56+
}
57+
58+
/**
59+
* @param Collection $jobs
60+
* @return Collection
61+
*/
62+
public function deleteJobs(Collection $jobs)
63+
{
64+
$jobs = $jobs->keyBy(function(SqsJob $job) {
65+
return $job->getJobId();
66+
});
67+
68+
$entries = $jobs->map(function (SqsJob $job) {
69+
return [
70+
'Id' => $job->getJobId(),
71+
'ReceiptHandle' => $job->getReceiptHandle(),
72+
];
73+
});
74+
75+
$failedJobs = collect();
76+
77+
if ($entries->isNotEmpty()) {
5778

5879
$response = $this->sqs->deleteMessageBatch([
5980
'QueueUrl' => $this->queue,
60-
'Entries' => $entries->values()->toArray(),
81+
'Entries' => $entries->values()->toArray(),
6182
]);
6283

63-
foreach ($response['Successful'] as $message) {
84+
foreach (collect($response->get('Successful')) as $message) {
6485
$this->jobs[$message['Id']]->setDeleted();
6586
}
87+
88+
foreach (collect($response->get('Failed')) as $message) {
89+
$job = $jobs->get($message['Id']);
90+
$job->error(array_except($message,'Id'));
91+
$failedJobs->put($message['Id'],$job);
92+
}
6693
}
6794

68-
$this->deleted = true;
95+
return $failedJobs;
6996
}
7097

7198
/**
7299
* Release the job back into the queue.
73100
*
74-
* @param int $delay
101+
* @param int $delay
75102
* @return void
76103
*/
77104
public function release($delay = 0)
78105
{
79-
$entries = collect($this->jobs)->transform(function(SqsJob $job) use ($delay) {
106+
$entries = collect($this->jobs)->transform(function (SqsJob $job) use ($delay) {
80107
return [
81-
'Id' => $job->getJobId(),
82-
'ReceiptHandle' => $job->getReceiptHandle(),
108+
'Id' => $job->getJobId(),
109+
'ReceiptHandle' => $job->getReceiptHandle(),
83110
'VisibilityTimeout' => $delay,
84111
];
85112
});
86113

87114
$response = $this->sqs->changeMessageVisibilityBatch([
88115
'QueueUrl' => $this->queue,
89-
'Entries' => $entries->values()->toArray(),
116+
'Entries' => $entries->values()->toArray(),
90117
]);
91118

92-
foreach ($response['Successful'] as $message) {
119+
foreach (collect($response->get('Successful')) as $message) {
93120
$this->jobs[$message['Id']]->setDeleted();
94121
}
95122
}

src/Queue/Jobs/SqsJob.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
class SqsJob extends LaravelSqsJob
88
{
9+
/**
10+
* @var mixed
11+
*/
12+
protected $error;
13+
914
/**
1015
* Get the decoded body of the job.
1116
*
@@ -51,4 +56,20 @@ public function setReleased()
5156
{
5257
$this->released = true;
5358
}
59+
60+
/**
61+
* @param mixed $error
62+
*/
63+
public function setError($error)
64+
{
65+
$this->error = $error;
66+
}
67+
68+
/**
69+
* @return mixed
70+
*/
71+
public function getError()
72+
{
73+
return $this->error;
74+
}
5475
}

0 commit comments

Comments
 (0)