forked from gjedeer/celery-php
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncResult.php
268 lines (238 loc) · 7.66 KB
/
AsyncResult.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
<?php
namespace Celery;
/*
* Asynchronous result of Celery task
* @package celery-php
*/
class AsyncResult
{
private $task_id; // string, queue name
private $connection; // AMQPStreamConnection instance
private $connection_details; // array of strings required to connect
private $complete_result; // Backend-dependent message instance (AMQPEnvelope or PhpAmqpLib\Message\AMQPMessage)
private $body; // decoded array with message body (whatever Celery task returned)
private $amqp = null; // AbstractAMQPConnector implementation
/**
* Don't instantiate AsyncResult yourself, used internally only
* @param string $id Task ID in Celery
* @param array $connection_details used to initialize AMQPStreamConnection, keys are the same as args to Celery::__construct
* @param string task_name
* @param array task_args
*/
public function __construct($id, $connection_details, $task_name=null, $task_args=null)
{
$this->task_id = $id;
$this->connection = Celery::InitializeAMQPStreamConnection($connection_details);
$this->connection_details = $connection_details;
$this->task_name = $task_name;
$this->task_args = $task_args;
$this->amqp = AbstractAMQPConnector::GetConcrete($connection_details['connector']);
}
public function __wakeup()
{
if ($this->connection_details) {
$this->connection = Celery::InitializeAMQPStreamConnection($this->connection_details);
}
}
/**
* Connect to queue, see if there's a result waiting for us
* Private - to be used internally
*/
private function getCompleteResult()
{
if ($this->complete_result) {
return $this->complete_result;
}
$message = $this->amqp->GetMessageBody($this->connection, $this->task_id, $this->connection_details['result_expire'], true);
if ($message !== false) {
$this->complete_result = $message['complete_result'];
$this->body = json_decode(
$message['body']
);
}
return false;
}
/**
* Helper function to return current microseconds time as float
*/
private static function getmicrotime()
{
list($usec, $sec) = explode(" ", microtime());
return ((float)$usec + (float)$sec);
}
/**
* Get the Task Id
* @return string
*/
public function getId()
{
return $this->task_id;
}
/**
* Check if a task result is ready
* @return bool
*/
public function isReady()
{
return ($this->getCompleteResult() !== false);
}
/**
* Return task status (needs to be called after isReady() returned true)
* @return string 'SUCCESS', 'FAILURE' etc - see Celery source
*/
public function getStatus()
{
if (!$this->body) {
throw new CeleryException('Called getStatus before task was ready');
}
return $this->body->status;
}
/**
* Check if task execution has been successful or resulted in an error
* @return bool
*/
public function isSuccess()
{
return($this->getStatus() == 'SUCCESS');
}
/**
* If task execution wasn't successful, return a Python traceback
* @return string
*/
public function getTraceback()
{
if (!$this->body) {
throw new CeleryException('Called getTraceback before task was ready');
}
return $this->body->traceback;
}
/**
* Return a result of successful execution.
* In case of failure, this returns an exception object
* @return mixed Whatever the task returned
*/
public function getResult()
{
if (!$this->body) {
throw new CeleryException('Called getResult before task was ready');
}
return $this->body->result;
}
/****************************************************************************
* Python API emulation *
* http://ask.github.com/celery/reference/celery.result.html *
****************************************************************************/
/**
* Returns TRUE if the task failed
*/
public function failed()
{
return $this->isReady() && !$this->isSuccess();
}
/**
* Forget about (and possibly remove the result of) this task
* Currently does nothing in PHP client
*/
public function forget()
{
}
/**
* Wait until task is ready, and return its result.
* @param float $timeout How long to wait, in seconds, before the operation times out
* @param bool $propagate (TODO - not working) Re-raise exception if the task failed.
* @param float $interval Time to wait (in seconds) before retrying to retrieve the result
* @throws CeleryTimeoutException on timeout
* @return mixed result on both success and failure
*/
public function get($timeout=10, $propagate=true, $interval=0.5)
{
$interval_us = (int)($interval * 1000000);
$start_time = self::getmicrotime();
while (self::getmicrotime() - $start_time < $timeout) {
if ($this->isReady()) {
break;
}
usleep($interval_us);
}
if (!$this->isReady()) {
throw new CeleryTimeoutException(sprintf('AMQP task %s(%s) did not return after %d seconds', $this->task_name, json_encode($this->task_args), $timeout), 4);
}
return $this->getResult();
}
/**
* Implementation of Python's properties: result, state/status
*/
public function __get($property)
{
/**
* When the task has been executed, this contains the return value.
* If the task raised an exception, this will be the exception instance.
*/
if ($property == 'result') {
if ($this->isReady()) {
return $this->getResult();
} else {
return null;
}
}
/**
* state: The tasks current state.
*
* Possible values includes:
*
* PENDING
* The task is waiting for execution.
*
* STARTED
* The task has been started.
*
* RETRY
* The task is to be retried, possibly because of failure.
*
* FAILURE
* The task raised an exception, or has exceeded the retry limit. The result attribute then contains the exception raised by the task.
*
* SUCCESS
* The task executed successfully. The result attribute then contains the tasks return value.
*
* status: Deprecated alias of state.
*/
elseif ($property == 'state' || $property == 'status') {
if ($this->isReady()) {
return $this->getStatus();
} else {
return 'PENDING';
}
}
return $this->$property;
}
/**
* Returns True if the task has been executed.
* If the task is still running, pending, or is waiting for retry then False is returned.
*/
public function ready()
{
return $this->isReady();
}
/**
* Send revoke signal to all workers
* Does nothing in PHP client
*/
public function revoke()
{
}
/**
* Returns True if the task executed successfully.
*/
public function successful()
{
return $this->isSuccess();
}
/**
* Deprecated alias to get()
*/
public function wait($timeout=10, $propagate=true, $interval=0.5)
{
return $this->get($timeout, $propagate, $interval);
}
}