Skip to content

Feature/autorefresh pipeline fix #22340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4b7d0b7
removed options bag, enabled and fixed tests
petrsvihlik Nov 19, 2021
877684f
fix build problems
petrsvihlik Nov 21, 2021
4a9435a
initial implementation of configurable autorefresh
petrsvihlik Nov 24, 2021
99ebf64
python 2.7 compat changes
petrsvihlik Nov 25, 2021
7530b31
py27 compat changes
petrsvihlik Nov 26, 2021
eb43cb3
fixed linting problems + comments
petrsvihlik Nov 26, 2021
053666e
py27 fixed flaky test
petrsvihlik Nov 26, 2021
45e8137
linting issues
petrsvihlik Nov 27, 2021
48ced5a
CommunicationTokenCredential async implemenation & tests are added
Dec 1, 2021
84b6348
split async code not to break py27
petrsvihlik Dec 2, 2021
594cd1d
lock issue for python 3.10 is fixed
Dec 2, 2021
b31a954
asyncio.sleep in async tests are removed
Dec 2, 2021
bd872df
test refactored
petrsvihlik Dec 2, 2021
2cd058c
updates in _shared duplicated in chat
Dec 2, 2021
b80bf06
updates in _shared duplicated in sms
Dec 2, 2021
e80757a
updates in _shared duplicated in networktraversal
Dec 2, 2021
fba4353
updates in _shared duplicated in phonenumbers
Dec 2, 2021
35cf672
lint issue fix in utils
Dec 3, 2021
932dbb6
python 2 compatibility fix for generate_token_with_custom_expiry & fi…
Dec 3, 2021
94cb4f6
phonenumber dependency version fix
Dec 14, 2021
6b7afc8
removed unneccasary user credential tests from sms,chat, networktrave…
Dec 15, 2021
ac54413
reduced the default refresh interval (api review)
petrsvihlik Dec 15, 2021
48ee8ec
time renamed to interval (api review)
petrsvihlik Dec 15, 2021
dae52ba
added python version requirement for communication packages
Jan 5, 2022
dbb131e
Merge branch 'main' into feature/autorefresh-pipeline-fix
AikoBB Jan 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,70 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from threading import Lock, Condition
from threading import Lock, Condition, Timer
from datetime import timedelta
from typing import ( # pylint: disable=unused-import

from typing import ( # pylint: disable=unused-import
cast,
Tuple,
Any
)
import six

from .utils import get_current_utc_as_int
from .user_token_refresh_options import CommunicationTokenRefreshOptions
from .utils import create_access_token


class CommunicationTokenCredential(object):
"""Credential type used for authenticating to an Azure Communication service.
:param str token: The token used to authenticate to an Azure Communication service
:keyword token_refresher: The token refresher to provide capacity to fetch fresh token
:keyword callable token_refresher: The async token refresher to provide capacity to fetch fresh token
:keyword bool refresh_proactively: Whether to refresh the token proactively or not
:keyword timedelta refresh_interval_before_expiry: The time interval before token expiry that causes the token_refresher to be called if refresh_proactively is true.
:raises: TypeError
"""

_ON_DEMAND_REFRESHING_INTERVAL_MINUTES = 2
_DEFAULT_AUTOREFRESH_INTERVAL_MINUTES = 4.5

def __init__(self,
token, # type: str
**kwargs
**kwargs # type: Any
):
token_refresher = kwargs.pop('token_refresher', None)
communication_token_refresh_options = CommunicationTokenRefreshOptions(token=token,
token_refresher=token_refresher)
self._token = communication_token_refresh_options.get_token()
self._token_refresher = communication_token_refresh_options.get_token_refresher()
if not isinstance(token, six.string_types):
raise TypeError("Token must be a string.")
self._token = create_access_token(token)
self._token_refresher = kwargs.pop('token_refresher', None)
self._refresh_proactively = kwargs.pop('refresh_proactively', False)
self._refresh_interval_before_expiry = kwargs.pop('refresh_interval_before_expiry', timedelta(
minutes=self._DEFAULT_AUTOREFRESH_INTERVAL_MINUTES))
self._timer = None
self._lock = Condition(Lock())
self._some_thread_refreshing = False
if self._refresh_proactively:
self._schedule_refresh()

def __enter__(self):
return self

def __exit__(self, *args):
if self._timer is not None:
self._timer.cancel()

def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument
# type (*str, **Any) -> AccessToken
def get_token(self):
# type () -> ~azure.core.credentials.AccessToken
"""The value of the configured token.
:rtype: ~azure.core.credentials.AccessToken
"""

if not self._token_refresher or not self._token_expiring():
return self._token

should_this_thread_refresh = False
self._update_token_and_reschedule()
return self._token

def _update_token_and_reschedule(self):
should_this_thread_refresh = False
with self._lock:
while self._token_expiring():
if self._some_thread_refreshing:
Expand All @@ -70,17 +91,32 @@ def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument
with self._lock:
self._some_thread_refreshing = False
self._lock.notify_all()

raise
if self._refresh_proactively:
self._schedule_refresh()
return self._token

def _schedule_refresh(self):
if self._timer is not None:
self._timer.cancel()

timespan = self._token.expires_on - \
get_current_utc_as_int() - self._refresh_interval_before_expiry.total_seconds()
self._timer = Timer(timespan, self._update_token_and_reschedule)
self._timer.start()

def _wait_till_inprogress_thread_finish_refreshing(self):
self._lock.release()
self._lock.acquire()

def _token_expiring(self):
if self._refresh_proactively:
interval = self._refresh_interval_before_expiry
else:
interval = timedelta(
minutes=self._ON_DEMAND_REFRESHING_INTERVAL_MINUTES)
return self._token.expires_on - get_current_utc_as_int() <\
timedelta(minutes=self._ON_DEMAND_REFRESHING_INTERVAL_MINUTES).total_seconds()
interval.total_seconds()

def _is_currenttoken_valid(self):
return get_current_utc_as_int() < self._token.expires_on
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,83 @@
# --------------------------------------------------------------------------
from asyncio import Condition, Lock
from datetime import timedelta
import sys
from typing import ( # pylint: disable=unused-import
cast,
Tuple,
Any
)
import six

from .utils import get_current_utc_as_int
from .user_token_refresh_options import CommunicationTokenRefreshOptions
from .utils import create_access_token
from .utils_async import AsyncTimer


class CommunicationTokenCredential(object):
"""Credential type used for authenticating to an Azure Communication service.
:param str token: The token used to authenticate to an Azure Communication service
:keyword token_refresher: The async token refresher to provide capacity to fetch fresh token
:keyword callable token_refresher: The async token refresher to provide capacity to fetch fresh token
:keyword bool refresh_proactively: Whether to refresh the token proactively or not
:keyword timedelta refresh_interval_before_expiry: The time interval before token expiry that causes the token_refresher to be called if refresh_proactively is true.
:raises: TypeError
"""

_ON_DEMAND_REFRESHING_INTERVAL_MINUTES = 2
_DEFAULT_AUTOREFRESH_INTERVAL_MINUTES = 4.5

def __init__(self, token: str, **kwargs: Any):
token_refresher = kwargs.pop('token_refresher', None)
communication_token_refresh_options = CommunicationTokenRefreshOptions(token=token,
token_refresher=token_refresher)
self._token = communication_token_refresh_options.get_token()
self._token_refresher = communication_token_refresh_options.get_token_refresher()
self._lock = Condition(Lock())
if not isinstance(token, six.string_types):
raise TypeError("Token must be a string.")
self._token = create_access_token(token)
self._token_refresher = kwargs.pop('token_refresher', None)
self._refresh_proactively = kwargs.pop('refresh_proactively', False)
self._refresh_interval_before_expiry = kwargs.pop('refresh_interval_before_expiry', timedelta(
minutes=self._DEFAULT_AUTOREFRESH_INTERVAL_MINUTES))
self._timer = None
self._async_mutex = Lock()
if sys.version_info[:3] == (3, 10, 0):
# Workaround for Python 3.10 bug(https://bugs.python.org/issue45416):
getattr(self._async_mutex, '_get_loop', lambda: None)()
self._lock = Condition(self._async_mutex)
self._some_thread_refreshing = False
if self._refresh_proactively:
self._schedule_refresh()

async def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument
# type (*str, **Any) -> AccessToken
async def __aenter__(self):
return self

async def __aexit__(self, *args):
if self._timer is not None:
self._timer.cancel()

async def get_token(self):
# type () -> ~azure.core.credentials.AccessToken
"""The value of the configured token.
:rtype: ~azure.core.credentials.AccessToken
"""
if not self._token_refresher or not self._token_expiring():
return self._token
await self._update_token_and_reschedule()
return self._token

async def _update_token_and_reschedule(self):
should_this_thread_refresh = False

async with self._lock:

while self._token_expiring():
if self._some_thread_refreshing:
if self._is_currenttoken_valid():
return self._token

await self._wait_till_inprogress_thread_finish_refreshing()
self._wait_till_inprogress_thread_finish_refreshing()
else:
should_this_thread_refresh = True
self._some_thread_refreshing = True
break


if should_this_thread_refresh:
try:
newtoken = await self._token_refresher() # pylint:disable=not-callable

newtoken = self._token_refresher() # pylint:disable=not-callable
async with self._lock:
self._token = newtoken
self._some_thread_refreshing = False
Expand All @@ -69,27 +90,32 @@ async def get_token(self, *scopes, **kwargs): # pylint: disable=unused-argument
async with self._lock:
self._some_thread_refreshing = False
self._lock.notify_all()

raise

if self._refresh_proactively:
self._schedule_refresh()
return self._token

async def _wait_till_inprogress_thread_finish_refreshing(self):
def _schedule_refresh(self):
if self._timer is not None:
self._timer.cancel()

timespan = self._token.expires_on - \
get_current_utc_as_int() - self._refresh_interval_before_expiry.total_seconds()
self._timer = AsyncTimer(timespan, self._update_token_and_reschedule)
self._timer.start()

def _wait_till_inprogress_thread_finish_refreshing(self):
self._lock.release()
await self._lock.acquire()
self._lock.acquire()

def _token_expiring(self):
if self._refresh_proactively:
interval = self._refresh_interval_before_expiry
else:
interval = timedelta(
minutes=self._ON_DEMAND_REFRESHING_INTERVAL_MINUTES)
return self._token.expires_on - get_current_utc_as_int() <\
timedelta(minutes=self._ON_DEMAND_REFRESHING_INTERVAL_MINUTES).total_seconds()
interval.total_seconds()

def _is_currenttoken_valid(self):
return get_current_utc_as_int() < self._token.expires_on

async def close(self) -> None:
pass

async def __aenter__(self):
return self

async def __aexit__(self, *args):
await self.close()
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,21 @@

import base64
import json
import calendar
import time
from typing import ( # pylint: disable=unused-import
cast,
Tuple,
)
from datetime import datetime
import calendar
from msrest.serialization import TZ_UTC
from azure.core.credentials import AccessToken

def _convert_datetime_to_utc_int(expires_on):
"""
Converts DateTime in local time to the Epoch in UTC in second.

:param input_datetime: Input datetime
:type input_datetime: datetime
:return: Integer
:rtype: int
"""
def _convert_datetime_to_utc_int(expires_on):
return int(calendar.timegm(expires_on.utctimetuple()))


def parse_connection_str(conn_str):
# type: (str) -> Tuple[str, str, str, str]
if conn_str is None:
Expand Down Expand Up @@ -53,16 +48,18 @@ def parse_connection_str(conn_str):

return host, str(shared_access_key)


def get_current_utc_time():
# type: () -> str
return str(datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S ")) + "GMT"
return str(datetime.now(tz=TZ_UTC).strftime("%a, %d %b %Y %H:%M:%S ")) + "GMT"


def get_current_utc_as_int():
# type: () -> int
current_utc_datetime = datetime.utcnow()
current_utc_datetime = datetime.now(tz=TZ_UTC)
return _convert_datetime_to_utc_int(current_utc_datetime)


def create_access_token(token):
# type: (str) -> azure.core.credentials.AccessToken
"""Creates an instance of azure.core.credentials.AccessToken from a
Expand All @@ -84,18 +81,20 @@ def create_access_token(token):
raise ValueError(token_parse_err_msg)

try:
padded_base64_payload = base64.b64decode(parts[1] + "==").decode('ascii')
padded_base64_payload = base64.b64decode(
parts[1] + "==").decode('ascii')
payload = json.loads(padded_base64_payload)
return AccessToken(token,
_convert_datetime_to_utc_int(datetime.fromtimestamp(payload['exp']).replace(tzinfo=TZ_UTC)))
_convert_datetime_to_utc_int(datetime.fromtimestamp(payload['exp'], tz=TZ_UTC)))
except ValueError:
raise ValueError(token_parse_err_msg)


def get_authentication_policy(
endpoint, # type: str
credential, # type: TokenCredential or str
decode_url=False, # type: bool
is_async=False, # type: bool
endpoint, # type: str
credential, # type: TokenCredential or str
decode_url=False, # type: bool
is_async=False, # type: bool
):
# type: (...) -> BearerTokenCredentialPolicy or HMACCredentialPolicy
"""Returns the correct authentication policy based
Expand Down Expand Up @@ -126,3 +125,7 @@ def get_authentication_policy(

raise TypeError("Unsupported credential: {}. Use an access token string to use HMACCredentialsPolicy"
"or a token credential from azure.identity".format(type(credential)))

def _convert_expires_on_datetime_to_utc_int(expires_on):
epoch = time.mktime(datetime(1970, 1, 1).timetuple())
return epoch-time.mktime(expires_on.timetuple())
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# ------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------

import asyncio


class AsyncTimer:
"""A non-blocking timer, that calls a function after a specified number of seconds:
:param int interval: time interval in seconds
:param callable callback: function to be called after the interval has elapsed
"""

def __init__(self, interval, callback):
self._interval = interval
self._callback = callback
self._task = None

def start(self):
self._task = asyncio.ensure_future(self._job())

async def _job(self):
await asyncio.sleep(self._interval)
await self._callback()

def cancel(self):
if self._task is not None:
self._task.cancel()
1 change: 1 addition & 0 deletions sdk/communication/azure-communication-chat/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"azure-core<2.0.0,>=1.19.1",
'six>=1.11.0'
],
python_requires=">=3.7",
extras_require={
":python_version<'3.0'": ['azure-communication-nspkg'],
":python_version<'3.5'": ["typing"],
Expand Down
Loading