Skip to content

Commit c0f88b7

Browse files
committed
Attempt at rpc fix
1 parent b351c85 commit c0f88b7

File tree

4 files changed

+27
-17
lines changed

4 files changed

+27
-17
lines changed

celery/app/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from billiard.util import register_after_fork
2222
from kombu.clocks import LamportClock
23+
from kombu.common import oid_from
2324
from kombu.serialization import enable_insecure_serializers
2425
from kombu.utils import cached_property
2526

@@ -545,6 +546,10 @@ def pool(self):
545546
def current_task(self):
546547
return _task_stack.top
547548

549+
@cached_property
550+
def oid(self):
551+
return oid_from(self)
552+
548553
@cached_property
549554
def amqp(self):
550555
return instantiate(self.amqp_cls, app=self)

celery/backends/rpc.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,14 @@
1010

1111
import kombu
1212

13-
from threading import local
14-
15-
from kombu.common import maybe_declare, oid_from
13+
from kombu.common import maybe_declare
14+
from kombu.utils import cached_property
1615

1716
from celery import current_task
1817
from celery.backends import amqp
1918

2019

2120
class RPCBackend(amqp.AMQPBackend):
22-
_tls = local()
2321

2422
class Consumer(kombu.Consumer):
2523
auto_declare = False
@@ -32,10 +30,6 @@ def on_task_call(self, producer, task_id):
3230
maybe_declare(self.binding(producer.channel), retry=True)
3331
return self.extra_properties
3432

35-
@property
36-
def extra_properties(self):
37-
return {'reply_to': self.oid}
38-
3933
def _create_binding(self, task_id):
4034
return self.binding
4135

@@ -53,10 +47,11 @@ def binding(self):
5347
return self.Queue(self.oid, self.exchange, self.oid,
5448
durable=False, auto_delete=False)
5549

56-
@property
50+
@cached_property
5751
def oid(self):
58-
try:
59-
return self._tls.OID
60-
except AttributeError:
61-
oid = self._tls.OID = oid_from(self)
62-
return oid
52+
return self.app.oid
53+
54+
@cached_property
55+
def extra_properties(self):
56+
return {'reply_to': self.oid}
57+

celery/canvas.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
2020

21-
from celery._state import current_app
21+
from celery._state import current_app, get_current_worker_task
2222
from celery.result import AsyncResult, GroupResult
2323
from celery.utils.functional import (
2424
maybe_list, is_list, regen,
@@ -163,13 +163,20 @@ def clone(self, args=(), kwargs={}, **opts):
163163
return s
164164
partial = clone
165165

166-
def _freeze(self, _id=None):
166+
def freeze(self, _id=None):
167167
opts = self.options
168168
try:
169169
tid = opts['task_id']
170170
except KeyError:
171171
tid = opts['task_id'] = _id or uuid()
172+
if 'reply_to' not in opts:
173+
curtask = get_current_worker_task()
174+
if curtask:
175+
opts['repy_to'] = curtask.request.reply_to
176+
else:
177+
opts['reply_to'] = self.type.app.oid
172178
return self.AsyncResult(tid)
179+
_freeze = freeze
173180

174181
def replace(self, args=None, kwargs=None, options=None):
175182
s = self.clone()
@@ -423,7 +430,7 @@ def __call__(self, *partial_args, **options):
423430
type = tasks[0].type.app.tasks[self['task']]
424431
return type(*type.prepare(options, tasks, partial_args))
425432

426-
def _freeze(self, _id=None):
433+
def freeze(self, _id=None):
427434
opts = self.options
428435
try:
429436
gid = opts['group']
@@ -436,6 +443,7 @@ def _freeze(self, _id=None):
436443
new_tasks.append(task)
437444
self.tasks = self.kwargs['tasks'] = new_tasks
438445
return GroupResult(gid, results)
446+
_freeze = freeze
439447

440448
def skew(self, start=1.0, stop=None, step=1.0):
441449
it = fxrange(start, stop, step, repeatlast=True)

celery/task/trace.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ def trace_task(uuid, args, kwargs, request=None):
227227
else:
228228
# callback tasks must be applied before the result is
229229
# stored, so that result.children is populated.
230+
print('CALLBACK OPTS: %r' % ([callback.options for
231+
callback in task_request.callbacks or []], ))
230232
[subtask(callback).apply_async((retval, ))
231233
for callback in task_request.callbacks or []]
232234
if publish_result:

0 commit comments

Comments
 (0)