Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
mkleen committed Sep 5, 2024
1 parent c4c6942 commit e341947
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions cr8/clients.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
import random

import aiohttp
import itertools
Expand Down Expand Up @@ -320,12 +319,11 @@ class HttpClient:
def __init__(self, hosts, conn_pool_limit=25, session_settings=None):
self.hosts = hosts
self.urls = itertools.cycle(list(map(_append_sql, hosts)))
self.conn_pool_limit = conn_pool_limit;
self.conn_pool_limit = conn_pool_limit
self.is_cratedb = True
self._pool = []
self.session_settings = session_settings or {}


@property
async def _session(self):
if not self._pool:
Expand All @@ -344,28 +342,32 @@ async def _session(self):
dumps(payload, cls=CrateJsonEncoder)
)
self._pool.append(session)
return random.choice(self._pool)

return self._pool.pop()


async def execute(self, stmt, args=None):
payload = {'stmt': _plain_or_callable(stmt)}
if args:
payload['args'] = _plain_or_callable(args)
session = await self._session
return await _exec(
result = await _exec(
session,
next(self.urls),
dumps(payload, cls=CrateJsonEncoder)
)
self._pool.append(session)
return result

async def execute_many(self, stmt, bulk_args):
data = dumps(dict(
stmt=_plain_or_callable(stmt),
bulk_args=_plain_or_callable(bulk_args)
), cls=CrateJsonEncoder)
session = await self._session
return await _exec(session, next(self.urls), data)
result = await _exec(session, next(self.urls), data)
self._pool.append(session)
return result

async def get_server_version(self):
session = await self._session
Expand All @@ -374,11 +376,13 @@ async def get_server_version(self):
async with session.get(url) as resp:
r = await resp.json()
version = r['version']
return {
result = {
'hash': version['build_hash'],
'number': version['number'],
'date': _date_or_none(version['build_timestamp'][:10])
}
self._pool.append(session)
return result

async def _close(self):
for session in self._pool:
Expand Down

0 comments on commit e341947

Please sign in to comment.