Skip to content

Commit

Permalink
Add simple pool to http client
Browse files Browse the repository at this point in the history
  • Loading branch information
mkleen committed Sep 4, 2024
1 parent 2d008ae commit c4c6942
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
43 changes: 29 additions & 14 deletions cr8/clients.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import random

import aiohttp
import itertools
import calendar
Expand Down Expand Up @@ -315,23 +317,36 @@ def _append_sql(host):


class HttpClient:
def __init__(self, hosts, conn_pool_limit=25):
def __init__(self, hosts, conn_pool_limit=25, session_settings=None):
self.hosts = hosts
self.urls = itertools.cycle(list(map(_append_sql, hosts)))
self._connector_params = {
'limit': conn_pool_limit,
'verify_ssl': _verify_ssl_from_first(self.hosts)
}
self.__session = None
self.conn_pool_limit = conn_pool_limit;
self.is_cratedb = True
self._pool = []
self.session_settings = session_settings or {}


@property
async def _session(self):
session = self.__session
if session is None:
conn = aiohttp.TCPConnector(**self._connector_params)
self.__session = session = aiohttp.ClientSession(connector=conn)
return session
if not self._pool:
self._connector_params = {
'limit': 1,
'verify_ssl': _verify_ssl_from_first(self.hosts)
}
for n in range(0, self.conn_pool_limit):
tcp_connector = aiohttp.TCPConnector(**self._connector_params)
session = aiohttp.ClientSession(connector=tcp_connector)
for setting, value in self.session_settings.items():
payload = {'stmt': f'set {setting}={value}'}
await _exec(
session,
next(self.urls),
dumps(payload, cls=CrateJsonEncoder)
)
self._pool.append(session)

return random.choice(self._pool)


async def execute(self, stmt, args=None):
payload = {'stmt': _plain_or_callable(stmt)}
Expand Down Expand Up @@ -366,8 +381,8 @@ async def get_server_version(self):
}

async def _close(self):
if self.__session:
await self.__session.close()
for session in self._pool:
await session.close()

def close(self):
asyncio.get_event_loop().run_until_complete(self._close())
Expand All @@ -385,4 +400,4 @@ def client(hosts, session_settings=None, concurrency=25):
if not asyncpg:
raise ValueError('Cannot use "asyncpg" scheme if asyncpg is not available')
return AsyncpgClient(hosts, pool_size=concurrency, session_settings=session_settings)
return HttpClient(_to_http_hosts(hosts), conn_pool_limit=concurrency)
return HttpClient(_to_http_hosts(hosts), conn_pool_limit=concurrency, session_settings=session_settings)
9 changes: 0 additions & 9 deletions cr8/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ def __init__(self, hosts, concurrency, sample_mode, session_settings=None):
self.concurrency = concurrency
self.client = client(hosts, session_settings=session_settings, concurrency=concurrency)
self.sampler = get_sampler(sample_mode)
self.session_settings = session_settings or {}

def set_session_settings(self):
if isinstance(self.client, HttpClient):
f = self.client.execute
for setting, value in self.session_settings.items():
stmt = f'set {setting}={value}'
statements = itertools.repeat((stmt, ()), self.concurrency)
aio.run_many(f, statements, self.concurrency, self.concurrency)

def warmup(self, stmt, num_warmup, concurrency=0, args=None):
statements = itertools.repeat((stmt, args or ()), num_warmup)
Expand Down
1 change: 0 additions & 1 deletion cr8/run_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ def run_queries(self, queries: Iterable[dict], meta=None, session_settings=None)
f' {mode_desc}: {duration or iterations}')
)
with Runner(self.benchmark_hosts, concurrency, self.sample_mode, session_settings) as runner:
runner.set_session_settings()
if warmup > 0:
runner.warmup(stmt, warmup, concurrency, args)
timed_stats = runner.run(
Expand Down

0 comments on commit c4c6942

Please sign in to comment.