2
2
import time
3
3
4
4
import abc
5
- import asyncio
6
5
import logging
7
6
import six
8
- from ydb import issues , credentials
9
7
from ydb .iam import auth
8
+ from .credentials import AbstractExpiringTokenCredentials
10
9
11
10
logger = logging .getLogger (__name__ )
12
11
25
24
aiohttp = None
26
25
27
26
28
- class _OneToManyValue (object ):
29
- def __init__ (self ):
30
- self ._value = None
31
- self ._condition = asyncio .Condition ()
32
-
33
- async def consume (self , timeout = 3 ):
34
- async with self ._condition :
35
- if self ._value is None :
36
- try :
37
- await asyncio .wait_for (self ._condition .wait (), timeout = timeout )
38
- except Exception :
39
- return self ._value
40
- return self ._value
41
-
42
- async def update (self , n_value ):
43
- async with self ._condition :
44
- prev_value = self ._value
45
- self ._value = n_value
46
- if prev_value is None :
47
- self ._condition .notify_all ()
48
-
49
-
50
- class _AtMostOneExecution (object ):
51
- def __init__ (self ):
52
- self ._can_schedule = True
53
- self ._lock = asyncio .Lock () # Lock to guarantee only one execution
54
-
55
- async def _wrapped_execution (self , callback ):
56
- await self ._lock .acquire ()
57
- try :
58
- res = callback ()
59
- if asyncio .iscoroutine (res ):
60
- await res
61
- except Exception :
62
- pass
63
-
64
- finally :
65
- self ._lock .release ()
66
- self ._can_schedule = True
67
-
68
- def submit (self , callback ):
69
- if self ._can_schedule :
70
- self ._can_schedule = False
71
- asyncio .ensure_future (self ._wrapped_execution (callback ))
72
-
73
-
74
27
@six .add_metaclass (abc .ABCMeta )
75
- class IamTokenCredentials (auth .IamTokenCredentials ):
76
- def __init__ (self ):
77
- super (IamTokenCredentials , self ).__init__ ()
78
- self ._tp = _AtMostOneExecution ()
79
- self ._iam_token = _OneToManyValue ()
80
-
81
- @abc .abstractmethod
82
- async def _get_iam_token (self ):
83
- pass
84
-
85
- async def _refresh (self ):
86
- current_time = time .time ()
87
- self ._log_refresh_start (current_time )
88
-
89
- try :
90
- auth_metadata = await self ._get_iam_token ()
91
- await self ._iam_token .update (auth_metadata ["access_token" ])
92
- self .update_expiration_info (auth_metadata )
93
- self .logger .info (
94
- "Token refresh successful. current_time %s, refresh_in %s" ,
95
- current_time ,
96
- self ._refresh_in ,
97
- )
98
-
99
- except (KeyboardInterrupt , SystemExit ):
100
- return
101
-
102
- except Exception as e :
103
- self .last_error = str (e )
104
- await asyncio .sleep (1 )
105
- self ._tp .submit (self ._refresh )
106
-
107
- async def iam_token (self ):
108
- current_time = time .time ()
109
- if current_time > self ._refresh_in :
110
- self ._tp .submit (self ._refresh )
111
-
112
- iam_token = await self ._iam_token .consume (timeout = 3 )
113
- if iam_token is None :
114
- if self .last_error is None :
115
- raise issues .ConnectionError (
116
- "%s: timeout occurred while waiting for token.\n %s"
117
- % self .__class__ .__name__ ,
118
- self .extra_error_message ,
119
- )
120
- raise issues .ConnectionError (
121
- "%s: %s.\n %s"
122
- % (self .__class__ .__name__ , self .last_error , self .extra_error_message )
123
- )
124
- return iam_token
125
-
126
- async def auth_metadata (self ):
127
- return [(credentials .YDB_AUTH_TICKET_HEADER , await self .iam_token ())]
128
-
129
-
130
- @six .add_metaclass (abc .ABCMeta )
131
- class TokenServiceCredentials (IamTokenCredentials ):
28
+ class TokenServiceCredentials (AbstractExpiringTokenCredentials ):
132
29
def __init__ (self , iam_endpoint = None , iam_channel_credentials = None ):
133
30
super (TokenServiceCredentials , self ).__init__ ()
31
+ assert (
32
+ iam_token_service_pb2_grpc is not None
33
+ ), "run pip install==ydb[yc] to use service account credentials"
34
+ self ._get_token_request_timeout = 10
134
35
self ._iam_endpoint = (
135
36
"iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint
136
37
)
137
38
self ._iam_channel_credentials = (
138
39
{} if iam_channel_credentials is None else iam_channel_credentials
139
40
)
140
- self ._get_token_request_timeout = 10
141
- if (
142
- iam_token_service_pb2_grpc is None
143
- or jwt is None
144
- or iam_token_service_pb2 is None
145
- ):
146
- raise RuntimeError (
147
- "Install jwt & yandex python cloud library to use service account credentials provider"
148
- )
149
41
150
42
def _channel_factory (self ):
151
43
return grpc .aio .secure_channel (
@@ -157,7 +49,7 @@ def _channel_factory(self):
157
49
def _get_token_request (self ):
158
50
pass
159
51
160
- async def _get_iam_token (self ):
52
+ async def _make_token_request (self ):
161
53
async with self ._channel_factory () as channel :
162
54
stub = iam_token_service_pb2_grpc .IamTokenServiceStub (channel )
163
55
response = await stub .Create (
@@ -209,20 +101,19 @@ def _get_token_request(self):
209
101
)
210
102
211
103
212
- class MetadataUrlCredentials (IamTokenCredentials ):
104
+ class MetadataUrlCredentials (AbstractExpiringTokenCredentials ):
213
105
def __init__ (self , metadata_url = None ):
214
106
super (MetadataUrlCredentials , self ).__init__ ()
215
- if aiohttp is None :
216
- raise RuntimeError (
217
- "Install aiohttp library to use metadata credentials provider"
218
- )
107
+ assert (
108
+ aiohttp is not None
109
+ ), "Install aiohttp library to use metadata credentials provider"
219
110
self ._metadata_url = (
220
111
auth .DEFAULT_METADATA_URL if metadata_url is None else metadata_url
221
112
)
222
113
self ._tp .submit (self ._refresh )
223
114
self .extra_error_message = "Check that metadata service configured properly and application deployed in VM or function at Yandex.Cloud."
224
115
225
- async def _get_iam_token (self ):
116
+ async def _make_token_request (self ):
226
117
timeout = aiohttp .ClientTimeout (total = 2 )
227
118
async with aiohttp .ClientSession (timeout = timeout ) as session :
228
119
async with session .get (
0 commit comments