Skip to content

Commit 5eddfc4

Browse files
authored
Merge pull request #33 from ydb-platform/refactor-code
support static credentials
2 parents 5cd8d52 + cb53d04 commit 5eddfc4

File tree

8 files changed

+253
-314
lines changed

8 files changed

+253
-314
lines changed

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@
2929
"enum-compat>=0.0.1",
3030
),
3131
options={"bdist_wheel": {"universal": True}},
32+
extras_require={
33+
"yc": ["yandexcloud", ],
34+
}
3235
)

ydb/_utilities.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
interceptor = None
1414

1515

16+
_grpcs_protocol = "grpcs://"
17+
_grpc_protocol = "grpc://"
18+
19+
1620
def wrap_result_in_future(result):
1721
f = futures.Future()
1822
f.set_result(result)
@@ -33,6 +37,32 @@ def x_ydb_sdk_build_info_header():
3337
return ("x-ydb-sdk-build-info", "ydb-python-sdk/" + ydb_version.VERSION)
3438

3539

40+
def is_secure_protocol(endpoint):
41+
return endpoint.startswith("grpcs://")
42+
43+
44+
def wrap_endpoint(endpoint):
45+
if endpoint.startswith(_grpcs_protocol):
46+
return endpoint[len(_grpcs_protocol) :]
47+
if endpoint.startswith(_grpc_protocol):
48+
return endpoint[len(_grpc_protocol) :]
49+
return endpoint
50+
51+
52+
def parse_connection_string(connection_string):
53+
cs = connection_string
54+
if not cs.startswith(_grpc_protocol) and not cs.startswith(_grpcs_protocol):
55+
# default is grpcs
56+
cs = _grpcs_protocol + cs
57+
58+
p = six.moves.urllib.parse.urlparse(connection_string)
59+
b = six.moves.urllib.parse.parse_qs(p.query)
60+
database = b.get("database", [])
61+
assert len(database) > 0
62+
63+
return p.scheme + "://" + p.netloc, database[0]
64+
65+
3666
# Decorator that ensures no exceptions are leaked from decorated async call
3767
def wrap_async_call_exceptions(f):
3868
@functools.wraps(f)

ydb/aio/iam.py

Lines changed: 12 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
import time
33

44
import abc
5-
import asyncio
65
import logging
76
import six
8-
from ydb import issues, credentials
97
from ydb.iam import auth
8+
from .credentials import AbstractExpiringTokenCredentials
109

1110
logger = logging.getLogger(__name__)
1211

@@ -25,127 +24,20 @@
2524
aiohttp = None
2625

2726

28-
class _OneToManyValue(object):
29-
def __init__(self):
30-
self._value = None
31-
self._condition = asyncio.Condition()
32-
33-
async def consume(self, timeout=3):
34-
async with self._condition:
35-
if self._value is None:
36-
try:
37-
await asyncio.wait_for(self._condition.wait(), timeout=timeout)
38-
except Exception:
39-
return self._value
40-
return self._value
41-
42-
async def update(self, n_value):
43-
async with self._condition:
44-
prev_value = self._value
45-
self._value = n_value
46-
if prev_value is None:
47-
self._condition.notify_all()
48-
49-
50-
class _AtMostOneExecution(object):
51-
def __init__(self):
52-
self._can_schedule = True
53-
self._lock = asyncio.Lock() # Lock to guarantee only one execution
54-
55-
async def _wrapped_execution(self, callback):
56-
await self._lock.acquire()
57-
try:
58-
res = callback()
59-
if asyncio.iscoroutine(res):
60-
await res
61-
except Exception:
62-
pass
63-
64-
finally:
65-
self._lock.release()
66-
self._can_schedule = True
67-
68-
def submit(self, callback):
69-
if self._can_schedule:
70-
self._can_schedule = False
71-
asyncio.ensure_future(self._wrapped_execution(callback))
72-
73-
7427
@six.add_metaclass(abc.ABCMeta)
75-
class IamTokenCredentials(auth.IamTokenCredentials):
76-
def __init__(self):
77-
super(IamTokenCredentials, self).__init__()
78-
self._tp = _AtMostOneExecution()
79-
self._iam_token = _OneToManyValue()
80-
81-
@abc.abstractmethod
82-
async def _get_iam_token(self):
83-
pass
84-
85-
async def _refresh(self):
86-
current_time = time.time()
87-
self._log_refresh_start(current_time)
88-
89-
try:
90-
auth_metadata = await self._get_iam_token()
91-
await self._iam_token.update(auth_metadata["access_token"])
92-
self.update_expiration_info(auth_metadata)
93-
self.logger.info(
94-
"Token refresh successful. current_time %s, refresh_in %s",
95-
current_time,
96-
self._refresh_in,
97-
)
98-
99-
except (KeyboardInterrupt, SystemExit):
100-
return
101-
102-
except Exception as e:
103-
self.last_error = str(e)
104-
await asyncio.sleep(1)
105-
self._tp.submit(self._refresh)
106-
107-
async def iam_token(self):
108-
current_time = time.time()
109-
if current_time > self._refresh_in:
110-
self._tp.submit(self._refresh)
111-
112-
iam_token = await self._iam_token.consume(timeout=3)
113-
if iam_token is None:
114-
if self.last_error is None:
115-
raise issues.ConnectionError(
116-
"%s: timeout occurred while waiting for token.\n%s"
117-
% self.__class__.__name__,
118-
self.extra_error_message,
119-
)
120-
raise issues.ConnectionError(
121-
"%s: %s.\n%s"
122-
% (self.__class__.__name__, self.last_error, self.extra_error_message)
123-
)
124-
return iam_token
125-
126-
async def auth_metadata(self):
127-
return [(credentials.YDB_AUTH_TICKET_HEADER, await self.iam_token())]
128-
129-
130-
@six.add_metaclass(abc.ABCMeta)
131-
class TokenServiceCredentials(IamTokenCredentials):
28+
class TokenServiceCredentials(AbstractExpiringTokenCredentials):
13229
def __init__(self, iam_endpoint=None, iam_channel_credentials=None):
13330
super(TokenServiceCredentials, self).__init__()
31+
assert (
32+
iam_token_service_pb2_grpc is not None
33+
), "run pip install==ydb[yc] to use service account credentials"
34+
self._get_token_request_timeout = 10
13435
self._iam_endpoint = (
13536
"iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint
13637
)
13738
self._iam_channel_credentials = (
13839
{} if iam_channel_credentials is None else iam_channel_credentials
13940
)
140-
self._get_token_request_timeout = 10
141-
if (
142-
iam_token_service_pb2_grpc is None
143-
or jwt is None
144-
or iam_token_service_pb2 is None
145-
):
146-
raise RuntimeError(
147-
"Install jwt & yandex python cloud library to use service account credentials provider"
148-
)
14941

15042
def _channel_factory(self):
15143
return grpc.aio.secure_channel(
@@ -157,7 +49,7 @@ def _channel_factory(self):
15749
def _get_token_request(self):
15850
pass
15951

160-
async def _get_iam_token(self):
52+
async def _make_token_request(self):
16153
async with self._channel_factory() as channel:
16254
stub = iam_token_service_pb2_grpc.IamTokenServiceStub(channel)
16355
response = await stub.Create(
@@ -209,20 +101,19 @@ def _get_token_request(self):
209101
)
210102

211103

212-
class MetadataUrlCredentials(IamTokenCredentials):
104+
class MetadataUrlCredentials(AbstractExpiringTokenCredentials):
213105
def __init__(self, metadata_url=None):
214106
super(MetadataUrlCredentials, self).__init__()
215-
if aiohttp is None:
216-
raise RuntimeError(
217-
"Install aiohttp library to use metadata credentials provider"
218-
)
107+
assert (
108+
aiohttp is not None
109+
), "Install aiohttp library to use metadata credentials provider"
219110
self._metadata_url = (
220111
auth.DEFAULT_METADATA_URL if metadata_url is None else metadata_url
221112
)
222113
self._tp.submit(self._refresh)
223114
self.extra_error_message = "Check that metadata service configured properly and application deployed in VM or function at Yandex.Cloud."
224115

225-
async def _get_iam_token(self):
116+
async def _make_token_request(self):
226117
timeout = aiohttp.ClientTimeout(total=2)
227118
async with aiohttp.ClientSession(timeout=timeout) as session:
228119
async with session.get(

ydb/connection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ def _construct_metadata(driver_config, settings):
138138
if driver_config.database is not None:
139139
metadata.append((YDB_DATABASE_HEADER, driver_config.database))
140140

141-
if driver_config.credentials is not None:
141+
need_rpc_auth = getattr(settings, "need_rpc_auth", True)
142+
if driver_config.credentials is not None and need_rpc_auth:
142143
metadata.extend(driver_config.credentials.auth_metadata())
143144

144145
if settings is not None:

0 commit comments

Comments
 (0)