Skip to content

Commit 732b236

Browse files
committed
chunks/map/starmap tasks now routes based on the target task (introducing Signature.route_name_for)
1 parent 3a6780b commit 732b236

File tree

2 files changed

+36
-7
lines changed

2 files changed

+36
-7
lines changed

celery/app/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
355355
publisher=None, link=None, link_error=None,
356356
add_to_parent=True, group_id=None, retries=0, chord=None,
357357
reply_to=None, time_limit=None, soft_time_limit=None,
358-
root_id=None, parent_id=None, **options):
358+
root_id=None, parent_id=None, route_name=None, **options):
359359
amqp = self.amqp
360360
task_id = task_id or uuid()
361361
producer = producer or publisher # XXX compat
@@ -365,7 +365,7 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
365365
warnings.warn(AlwaysEagerIgnored(
366366
'CELERY_ALWAYS_EAGER has no effect on send_task',
367367
), stacklevel=2)
368-
options = router.route(options, name, args, kwargs)
368+
options = router.route(options, route_name or name, args, kwargs)
369369

370370
message = amqp.create_task_message(
371371
task_id, name, args, kwargs, countdown, eta, group_id,

celery/canvas.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ def maybe_unroll_group(g):
9999
return g.tasks[0] if size == 1 else g
100100

101101

102+
def task_name_from(task):
103+
return getattr(task, 'name', task)
104+
105+
102106
class Signature(dict):
103107
"""Class that wraps the arguments and execution options
104108
for a single task invocation.
@@ -230,7 +234,7 @@ def set(self, immutable=None, **options):
230234
def set_immutable(self, immutable):
231235
self.immutable = immutable
232236

233-
def apply_async(self, args=(), kwargs={}, **options):
237+
def apply_async(self, args=(), kwargs={}, route_name=None, **options):
234238
try:
235239
_apply = self._apply_async
236240
except IndexError: # no tasks for chain, etc to find type
@@ -240,7 +244,17 @@ def apply_async(self, args=(), kwargs={}, **options):
240244
args, kwargs, options = self._merge(args, kwargs, options)
241245
else:
242246
args, kwargs, options = self.args, self.kwargs, self.options
243-
return _apply(args, kwargs, **options)
247+
route_name = route_name or self.route_name_for(args, kwargs, options)
248+
return _apply(args, kwargs, route_name=route_name, **options)
249+
250+
def route_name_for(self, args, kwargs, options):
251+
"""Can be used to override the name used for routing the task
252+
to a queue.
253+
254+
If this returns :const:`None` the name of the task will be used.
255+
256+
"""
257+
pass
244258

245259
def append_to_list_option(self, key, value):
246260
items = self.options.setdefault(key, [])
@@ -309,6 +323,11 @@ def election(self):
309323
def __repr__(self):
310324
return self.reprcall()
311325

326+
@property
327+
def name(self):
328+
# for duck typing compatibility with Task.name
329+
return self.task
330+
312331
@cached_property
313332
def type(self):
314333
return self._type or self.app.tasks[self['task']]
@@ -485,11 +504,15 @@ def __init__(self, task, it, **options):
485504
{'task': task, 'it': regen(it)}, immutable=True, **options
486505
)
487506

507+
def route_name_for(self, args, kwargs, options):
508+
return task_name_from(self.kwargs.get('task'))
509+
488510
def apply_async(self, args=(), kwargs={}, **opts):
489511
# need to evaluate generators
490512
task, it = self._unpack_args(self.kwargs)
491513
return self.type.apply_async(
492-
(), {'task': task, 'it': list(it)}, **opts
514+
(), {'task': task, 'it': list(it)},
515+
route_name=self.route_name_for(args, kwargs, opts), **opts
493516
)
494517

495518
@classmethod
@@ -532,11 +555,17 @@ def __init__(self, task, it, n, **options):
532555
def from_dict(self, d, app=None):
533556
return chunks(*self._unpack_args(d['kwargs']), app=app, **d['options'])
534557

558+
def route_name_for(self, args, kwargs, options):
559+
return task_name_from(self.kwargs.get('task'))
560+
535561
def apply_async(self, args=(), kwargs={}, **opts):
536-
return self.group().apply_async(args, kwargs, **opts)
562+
return self.group().apply_async(
563+
args, kwargs,
564+
route_name=self.route_name_for(args, kwargs, opts), **opts
565+
)
537566

538567
def __call__(self, **options):
539-
return self.group()(**options)
568+
return self.apply_async(**options)
540569

541570
def group(self):
542571
# need to evaluate generators

0 commit comments

Comments
 (0)