Skip to content

Commit 5d62e62

Browse files
committed
Allow returning multiple messages in a queue binding
Fixes: #192
1 parent eb48048 commit 5d62e62

File tree

8 files changed

+115
-1
lines changed

8 files changed

+115
-1
lines changed

azure/functions_worker/bindings/meta.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import abc
2+
import collections.abc
23
import datetime
34
import enum
45
import json
56
import typing
67

78
from .. import protos
9+
from .. import typing_inspect
810

911

1012
class TypedDataKind(enum.Enum):
@@ -211,6 +213,28 @@ def to_proto(cls, obj: typing.Any, *,
211213
pass
212214

213215

216+
def is_iterable_type_annotation(annotation: type, pytype: type) -> bool:
217+
is_iterable_anno = (
218+
typing_inspect.is_generic_type(annotation) and
219+
issubclass(typing_inspect.get_origin(annotation),
220+
collections.abc.Iterable)
221+
)
222+
223+
if not is_iterable_anno:
224+
return False
225+
226+
args = typing_inspect.get_args(annotation)
227+
if not args:
228+
return False
229+
230+
if isinstance(pytype, tuple):
231+
return any(isinstance(t, type) and issubclass(t, arg)
232+
for t in pytype for arg in args)
233+
else:
234+
return any(isinstance(pytype, type) and issubclass(pytype, arg)
235+
for arg in args)
236+
237+
214238
def is_binding(bind_name: str) -> bool:
215239
return bind_name in _ConverterMeta._binding_types
216240

azure/functions_worker/bindings/queue.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import collections.abc
12
import datetime
23
import json
34
import typing
@@ -99,7 +100,11 @@ class QueueMessageOutConverter(meta.OutConverter, binding='queue'):
99100

100101
@classmethod
101102
def check_output_type_annotation(cls, pytype: type) -> bool:
102-
return issubclass(pytype, (azf_abc.QueueMessage, str, bytes))
103+
valid_types = (azf_abc.QueueMessage, str, bytes)
104+
return (
105+
issubclass(pytype, valid_types) or
106+
meta.is_iterable_type_annotation(pytype, valid_types)
107+
)
103108

104109
@classmethod
105110
def to_proto(cls, obj: typing.Any, *,
@@ -118,6 +123,25 @@ def to_proto(cls, obj: typing.Any, *,
118123
})
119124
)
120125

126+
elif isinstance(obj, collections.abc.Iterable):
127+
msgs = []
128+
for item in obj:
129+
if isinstance(item, str):
130+
msgs.append(item)
131+
elif isinstance(item, azf_queue.QueueMessage):
132+
msgs.append({
133+
'id': item.id,
134+
'body': item.get_body().decode('utf-8')
135+
})
136+
else:
137+
raise NotImplementedError(
138+
'invalid data type in output '
139+
'queue message list: {}'.format(type(item)))
140+
141+
return protos.TypedData(
142+
json=json.dumps(msgs)
143+
)
144+
121145
raise NotImplementedError
122146

123147
@classmethod

azure/functions_worker/functions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import inspect
2+
import operator
23
import typing
34

45
import azure.functions as azf
@@ -138,6 +139,15 @@ def add_function(self, function_id: str,
138139
f'binding {param.name} has invalid Out annotation '
139140
f'{param_anno!r}')
140141
param_py_type = param_anno_args[0]
142+
143+
# typing_inspect.get_args() returns a flat list,
144+
# so if the annotation was func.Out[typing.List[foo]],
145+
# we need to reconstruct it.
146+
if (isinstance(param_py_type, tuple) and
147+
typing_inspect.is_generic_type(param_py_type[0])):
148+
149+
param_py_type = operator.getitem(
150+
param_py_type[0], *param_py_type[1:])
141151
else:
142152
param_py_type = param_anno
143153

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"scriptFile": "main.py",
3+
"disabled": false,
4+
5+
"bindings": [
6+
{
7+
"type": "httpTrigger",
8+
"direction": "in",
9+
"name": "req"
10+
},
11+
{
12+
"direction": "out",
13+
"name": "msgs",
14+
"queueName": "testqueue-return-multiple",
15+
"connection": "AzureWebJobsStorage",
16+
"type": "queue"
17+
}
18+
]
19+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import typing
2+
import azure.functions as azf
3+
4+
5+
def main(req: azf.HttpRequest, msgs: azf.Out[typing.List[str]]):
6+
msgs.set(['one', 'two'])
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"scriptFile": "main.py",
3+
"disabled": false,
4+
5+
"bindings": [
6+
{
7+
"type": "queueTrigger",
8+
"direction": "in",
9+
"name": "msg",
10+
"queueName": "testqueue-return-multiple",
11+
"connection": "AzureWebJobsStorage",
12+
}
13+
]
14+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import logging
2+
import azure.functions as azf
3+
4+
5+
logger = logging.getLogger(__name__)
6+
7+
8+
def main(msg: azf.QueueMessage) -> None:
9+
logging.info('trigger on message: %s', msg.get_body().decode('utf-8'))

tests/test_queue_functions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,11 @@ def test_queue_message_object_return(self):
5353
r = self.webhost.request('GET', 'get_queue_blob_message_return')
5454
self.assertEqual(r.status_code, 200)
5555
self.assertEqual(r.text, 'test-message-object-return')
56+
57+
def test_queue_return_multiple(self):
58+
r = self.webhost.request('POST', 'put_queue_return_multiple',
59+
data='foo')
60+
self.assertTrue(200 <= r.status_code < 300)
61+
62+
# wait for queue_trigger to process the queue item
63+
time.sleep(1)

0 commit comments

Comments
 (0)