Skip to content

Commit e85ac17

Browse files
authored
[EventHub] IoTHub management operations improvement and bug fixing (#6894)
* Fix bug that iothub hub can't receive * Support direct mgmt ops of iothub * Improve mgmt ops and update livetest * Small fix * Improvement of iothub mgmt
1 parent e5c8d1c commit e85ac17

File tree

8 files changed

+185
-32
lines changed

8 files changed

+185
-32
lines changed

sdk/eventhub/azure-eventhubs/azure/eventhub/_consumer_producer_mixin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def _open(self, timeout_time=None): # pylint:disable=unused-argument # TODO: to
7373
else:
7474
alt_creds = {}
7575
self._create_handler()
76-
self._handler.open(connection=self.client._conn_manager.get_connection(
76+
self._handler.open(connection=self.client._conn_manager.get_connection( # pylint: disable=protected-access
7777
self.client.address.hostname,
7878
self.client.get_auth(**alt_creds)
7979
))

sdk/eventhub/azure-eventhubs/azure/eventhub/aio/client_async.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datetime
77
import functools
88
import asyncio
9+
910
from typing import Any, List, Dict, Union, TYPE_CHECKING
1011

1112
from uamqp import authentication, constants # type: ignore
@@ -47,6 +48,7 @@ class EventHubClient(EventHubClientAbstract):
4748
def __init__(self, host, event_hub_path, credential, **kwargs):
4849
# type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None
4950
super(EventHubClient, self).__init__(host=host, event_hub_path=event_hub_path, credential=credential, **kwargs)
51+
self._lock = asyncio.Lock()
5052
self._conn_manager = get_connection_manager(**kwargs)
5153

5254
async def __aenter__(self):
@@ -105,10 +107,17 @@ async def _close_connection(self):
105107
await self._conn_manager.reset_connection_if_broken()
106108

107109
async def _management_request(self, mgmt_msg, op_type):
110+
if self._is_iothub and not self._iothub_redirect_info:
111+
await self._iothub_redirect()
112+
113+
alt_creds = {
114+
"username": self._auth_config.get("iot_username"),
115+
"password": self._auth_config.get("iot_password")
116+
}
108117
max_retries = self.config.max_retries
109118
retry_count = 0
110119
while True:
111-
mgmt_auth = self._create_auth()
120+
mgmt_auth = self._create_auth(**alt_creds)
112121
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
113122
try:
114123
conn = await self._conn_manager.get_connection(self.host, mgmt_auth)
@@ -126,6 +135,18 @@ async def _management_request(self, mgmt_msg, op_type):
126135
finally:
127136
await mgmt_client.close_async()
128137

138+
async def _iothub_redirect(self):
139+
async with self._lock:
140+
if self._is_iothub and not self._iothub_redirect_info:
141+
if not self._redirect_consumer:
142+
self._redirect_consumer = self.create_consumer(consumer_group='$default',
143+
partition_id='0',
144+
event_position=EventPosition('-1'),
145+
operation='/messages/events')
146+
async with self._redirect_consumer:
147+
await self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access
148+
self._redirect_consumer = None
149+
129150
async def get_properties(self):
130151
# type:() -> Dict[str, Any]
131152
"""
@@ -139,6 +160,8 @@ async def get_properties(self):
139160
:rtype: dict
140161
:raises: ~azure.eventhub.ConnectError
141162
"""
163+
if self._is_iothub and not self._iothub_redirect_info:
164+
await self._iothub_redirect()
142165
mgmt_msg = Message(application_properties={'name': self.eh_name})
143166
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
144167
output = {}
@@ -178,6 +201,8 @@ async def get_partition_properties(self, partition):
178201
:rtype: dict
179202
:raises: ~azure.eventhub.ConnectError
180203
"""
204+
if self._is_iothub and not self._iothub_redirect_info:
205+
await self._iothub_redirect()
181206
mgmt_msg = Message(application_properties={'name': self.eh_name,
182207
'partition': partition})
183208
response = await self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')

sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,10 @@ async def __anext__(self):
110110

111111
def _create_handler(self):
112112
alt_creds = {
113-
"username": self.client._auth_config.get("iot_username"), # pylint:disable=protected-access
114-
"password": self.client._auth_config.get("iot_password")} # pylint:disable=protected-access
113+
"username": self.client._auth_config.get("iot_username") if self.redirected else None, # pylint:disable=protected-access
114+
"password": self.client._auth_config.get("iot_password") if self.redirected else None # pylint:disable=protected-access
115+
}
116+
115117
source = Source(self.source)
116118
if self.offset is not None:
117119
source.set_filter(self.offset._selector()) # pylint:disable=protected-access
@@ -134,19 +136,25 @@ async def _redirect(self, redirect):
134136
self.messages_iter = None
135137
await super(EventHubConsumer, self)._redirect(redirect)
136138

137-
async def _open(self, timeout_time=None):
139+
async def _open(self, timeout_time=None, **kwargs):
138140
"""
139141
Open the EventHubConsumer using the supplied connection.
140142
If the handler has previously been redirected, the redirect
141143
context will be used to create a new handler before opening it.
142144
143145
"""
144146
# pylint: disable=protected-access
147+
self.redirected = self.redirected or self.client._iothub_redirect_info
148+
145149
if not self.running and self.redirected:
146150
self.client._process_redirect_uri(self.redirected)
147151
self.source = self.redirected.address
148152
await super(EventHubConsumer, self)._open(timeout_time)
149153

154+
@_retry_decorator
155+
async def _open_with_retry(self, timeout_time=None, **kwargs):
156+
return await self._open(timeout_time=timeout_time, **kwargs)
157+
150158
async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
151159
last_exception = kwargs.get("last_exception")
152160
data_batch = kwargs.get("data_batch")
@@ -254,4 +262,5 @@ async def close(self, exception=None):
254262
self.error = EventHubError(str(exception))
255263
else:
256264
self.error = EventHubError("This receive handler is now closed.")
257-
await self._handler.close_async()
265+
if self._handler:
266+
await self._handler.close_async()

sdk/eventhub/azure-eventhubs/azure/eventhub/client.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import logging
88
import datetime
99
import functools
10+
import threading
11+
1012
from typing import Any, List, Dict, Union, TYPE_CHECKING
1113

1214
import uamqp # type: ignore
@@ -46,6 +48,7 @@ class EventHubClient(EventHubClientAbstract):
4648
def __init__(self, host, event_hub_path, credential, **kwargs):
4749
# type:(str, str, Union[EventHubSharedKeyCredential, EventHubSASTokenCredential, TokenCredential], Any) -> None
4850
super(EventHubClient, self).__init__(host=host, event_hub_path=event_hub_path, credential=credential, **kwargs)
51+
self._lock = threading.RLock()
4952
self._conn_manager = get_connection_manager(**kwargs)
5053

5154
def __enter__(self):
@@ -106,10 +109,15 @@ def _close_connection(self):
106109
self._conn_manager.reset_connection_if_broken()
107110

108111
def _management_request(self, mgmt_msg, op_type):
112+
alt_creds = {
113+
"username": self._auth_config.get("iot_username"),
114+
"password": self._auth_config.get("iot_password")
115+
}
116+
109117
max_retries = self.config.max_retries
110118
retry_count = 0
111119
while retry_count <= self.config.max_retries:
112-
mgmt_auth = self._create_auth()
120+
mgmt_auth = self._create_auth(**alt_creds)
113121
mgmt_client = uamqp.AMQPClient(self.mgmt_target)
114122
try:
115123
conn = self._conn_manager.get_connection(self.host, mgmt_auth) #pylint:disable=assignment-from-none
@@ -127,6 +135,18 @@ def _management_request(self, mgmt_msg, op_type):
127135
finally:
128136
mgmt_client.close()
129137

138+
def _iothub_redirect(self):
139+
with self._lock:
140+
if self._is_iothub and not self._iothub_redirect_info:
141+
if not self._redirect_consumer:
142+
self._redirect_consumer = self.create_consumer(consumer_group='$default',
143+
partition_id='0',
144+
event_position=EventPosition('-1'),
145+
operation='/messages/events')
146+
with self._redirect_consumer:
147+
self._redirect_consumer._open_with_retry(timeout=self.config.receive_timeout) # pylint: disable=protected-access
148+
self._redirect_consumer = None
149+
130150
def get_properties(self):
131151
# type:() -> Dict[str, Any]
132152
"""
@@ -140,6 +160,8 @@ def get_properties(self):
140160
:rtype: dict
141161
:raises: ~azure.eventhub.ConnectError
142162
"""
163+
if self._is_iothub and not self._iothub_redirect_info:
164+
self._iothub_redirect()
143165
mgmt_msg = Message(application_properties={'name': self.eh_name})
144166
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:eventhub')
145167
output = {}
@@ -179,6 +201,8 @@ def get_partition_properties(self, partition):
179201
:rtype: dict
180202
:raises: ~azure.eventhub.ConnectError
181203
"""
204+
if self._is_iothub and not self._iothub_redirect_info:
205+
self._iothub_redirect()
182206
mgmt_msg = Message(application_properties={'name': self.eh_name,
183207
'partition': partition})
184208
response = self._management_request(mgmt_msg, op_type=b'com.microsoft:partition')

sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import functools
1212
from abc import abstractmethod
1313
from typing import Dict, Union, Any, TYPE_CHECKING
14-
from azure.eventhub import __version__
14+
15+
from azure.eventhub import __version__, EventPosition
1516
from azure.eventhub.configuration import _Configuration
1617
from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address
1718

@@ -153,6 +154,8 @@ def __init__(self, host, event_hub_path, credential, **kwargs):
153154
self.get_auth = functools.partial(self._create_auth)
154155
self.config = _Configuration(**kwargs)
155156
self.debug = self.config.network_tracing
157+
self._is_iothub = False
158+
self._iothub_redirect_info = None
156159

157160
log.info("%r: Created the Event Hub client", self.container_id)
158161

@@ -173,6 +176,11 @@ def _from_iothub_connection_string(cls, conn_str, **kwargs):
173176
'iot_password': key,
174177
'username': username,
175178
'password': password}
179+
client._is_iothub = True
180+
client._redirect_consumer = client.create_consumer(consumer_group='$default',
181+
partition_id='0',
182+
event_position=EventPosition('-1'),
183+
operation='/messages/events')
176184
return client
177185

178186
@abstractmethod
@@ -213,6 +221,8 @@ def _process_redirect_uri(self, redirect):
213221
self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path)
214222
self.eh_name = self.address.path.lstrip('/')
215223
self.mgmt_target = redirect_uri
224+
if self._is_iothub:
225+
self._iothub_redirect_info = redirect
216226

217227
@classmethod
218228
def from_connection_string(cls, conn_str, **kwargs):

sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ def __next__(self):
106106

107107
def _create_handler(self):
108108
alt_creds = {
109-
"username": self.client._auth_config.get("iot_username"), # pylint:disable=protected-access
110-
"password": self.client._auth_config.get("iot_password")} # pylint:disable=protected-access
109+
"username": self.client._auth_config.get("iot_username") if self.redirected else None, # pylint:disable=protected-access
110+
"password": self.client._auth_config.get("iot_password") if self.redirected else None # pylint:disable=protected-access
111+
}
112+
111113
source = Source(self.source)
112114
if self.offset is not None:
113115
source.set_filter(self.offset._selector()) # pylint:disable=protected-access
@@ -129,19 +131,25 @@ def _redirect(self, redirect):
129131
self.messages_iter = None
130132
super(EventHubConsumer, self)._redirect(redirect)
131133

132-
def _open(self, timeout_time=None):
134+
def _open(self, timeout_time=None, **kwargs):
133135
"""
134136
Open the EventHubConsumer using the supplied connection.
135137
If the handler has previously been redirected, the redirect
136138
context will be used to create a new handler before opening it.
137139
138140
"""
139141
# pylint: disable=protected-access
142+
self.redirected = self.redirected or self.client._iothub_redirect_info
143+
140144
if not self.running and self.redirected:
141145
self.client._process_redirect_uri(self.redirected)
142146
self.source = self.redirected.address
143147
super(EventHubConsumer, self)._open(timeout_time)
144148

149+
@_retry_decorator
150+
def _open_with_retry(self, timeout_time=None, **kwargs):
151+
return self._open(timeout_time=timeout_time, **kwargs)
152+
145153
def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
146154
last_exception = kwargs.get("last_exception")
147155
data_batch = kwargs.get("data_batch")

sdk/eventhub/azure-eventhubs/tests/asynctests/test_iothub_receive_async.py

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,43 +4,82 @@
44
# license information.
55
#--------------------------------------------------------------------------
66

7-
import os
87
import asyncio
98
import pytest
10-
import time
119

1210
from azure.eventhub.aio import EventHubClient
13-
from azure.eventhub import EventData, EventPosition, EventHubError
11+
from azure.eventhub import EventPosition
1412

1513

1614
async def pump(receiver, sleep=None):
1715
messages = 0
1816
if sleep:
1917
await asyncio.sleep(sleep)
2018
async with receiver:
21-
batch = await receiver.receive(timeout=1)
19+
batch = await receiver.receive(timeout=3)
2220
messages += len(batch)
2321
return messages
2422

2523

26-
async def get_partitions(iot_connection_str):
27-
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
28-
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), prefetch=1000, operation='/messages/events')
29-
async with receiver:
30-
partitions = await client.get_properties()
31-
return partitions["partition_ids"]
32-
33-
3424
@pytest.mark.liveTest
3525
@pytest.mark.asyncio
3626
async def test_iothub_receive_multiple_async(iot_connection_str):
37-
pytest.skip("This will get AuthenticationError. We're investigating...")
38-
partitions = await get_partitions(iot_connection_str)
3927
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
28+
partitions = await client.get_partition_ids()
4029
receivers = []
4130
for p in partitions:
4231
receivers.append(client.create_consumer(consumer_group="$default", partition_id=p, event_position=EventPosition("-1"), prefetch=10, operation='/messages/events'))
4332
outputs = await asyncio.gather(*[pump(r) for r in receivers])
4433

4534
assert isinstance(outputs[0], int) and outputs[0] <= 10
4635
assert isinstance(outputs[1], int) and outputs[1] <= 10
36+
37+
38+
@pytest.mark.liveTest
39+
@pytest.mark.asyncio
40+
async def test_iothub_get_properties_async(iot_connection_str, device_id):
41+
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
42+
properties = await client.get_properties()
43+
assert properties["partition_ids"] == ["0", "1", "2", "3"]
44+
45+
46+
@pytest.mark.liveTest
47+
@pytest.mark.asyncio
48+
async def test_iothub_get_partition_ids_async(iot_connection_str, device_id):
49+
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
50+
partitions = await client.get_partition_ids()
51+
assert partitions == ["0", "1", "2", "3"]
52+
53+
54+
@pytest.mark.liveTest
55+
@pytest.mark.asyncio
56+
async def test_iothub_get_partition_properties_async(iot_connection_str, device_id):
57+
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
58+
partition_properties = await client.get_partition_properties("0")
59+
assert partition_properties["id"] == "0"
60+
61+
62+
@pytest.mark.liveTest
63+
@pytest.mark.asyncio
64+
async def test_iothub_receive_after_mgmt_ops_async(iot_connection_str, device_id):
65+
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
66+
partitions = await client.get_partition_ids()
67+
assert partitions == ["0", "1", "2", "3"]
68+
receiver = client.create_consumer(consumer_group="$default", partition_id=partitions[0], event_position=EventPosition("-1"), operation='/messages/events')
69+
async with receiver:
70+
received = await receiver.receive(timeout=5)
71+
assert len(received) == 0
72+
73+
74+
@pytest.mark.liveTest
75+
@pytest.mark.asyncio
76+
async def test_iothub_mgmt_ops_after_receive_async(iot_connection_str, device_id):
77+
client = EventHubClient.from_connection_string(iot_connection_str, network_tracing=False)
78+
receiver = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EventPosition("-1"), operation='/messages/events')
79+
async with receiver:
80+
received = await receiver.receive(timeout=5)
81+
assert len(received) == 0
82+
83+
partitions = await client.get_partition_ids()
84+
assert partitions == ["0", "1", "2", "3"]
85+

0 commit comments

Comments
 (0)