Skip to content

Commit

Permalink
Initialize every connection of http client with session settings
Browse files Browse the repository at this point in the history
  • Loading branch information
mkleen committed Sep 3, 2024
1 parent ab385e0 commit 2d008ae
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
12 changes: 2 additions & 10 deletions cr8/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _append_sql(host):


class HttpClient:
def __init__(self, hosts, conn_pool_limit=25, session_settings=None):
def __init__(self, hosts, conn_pool_limit=25):
self.hosts = hosts
self.urls = itertools.cycle(list(map(_append_sql, hosts)))
self._connector_params = {
Expand All @@ -324,21 +324,13 @@ def __init__(self, hosts, conn_pool_limit=25, session_settings=None):
}
self.__session = None
self.is_cratedb = True
self.session_settings = session_settings or {}

@property
async def _session(self):
session = self.__session
if session is None:
conn = aiohttp.TCPConnector(**self._connector_params)
self.__session = session = aiohttp.ClientSession(connector=conn)
for setting, value in self.session_settings.items():
payload = {'stmt': f'set {setting}={value}'}
await _exec(
session,
next(self.urls),
dumps(payload, cls=CrateJsonEncoder)
)
return session

async def execute(self, stmt, args=None):
Expand Down Expand Up @@ -393,4 +385,4 @@ def client(hosts, session_settings=None, concurrency=25):
if not asyncpg:
raise ValueError('Cannot use "asyncpg" scheme if asyncpg is not available')
return AsyncpgClient(hosts, pool_size=concurrency, session_settings=session_settings)
return HttpClient(_to_http_hosts(hosts), conn_pool_limit=concurrency, session_settings=session_settings)
return HttpClient(_to_http_hosts(hosts), conn_pool_limit=concurrency)
12 changes: 10 additions & 2 deletions cr8/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from cr8 import aio
from cr8.metrics import Stats, get_sampler
from cr8.clients import client

from cr8.clients import client, HttpClient

TimedStats = namedtuple('TimedStats', ['started', 'ended', 'stats'])

Expand Down Expand Up @@ -73,6 +72,15 @@ def __init__(self, hosts, concurrency, sample_mode, session_settings=None):
self.concurrency = concurrency
self.client = client(hosts, session_settings=session_settings, concurrency=concurrency)
self.sampler = get_sampler(sample_mode)
self.session_settings = session_settings or {}

def set_session_settings(self):
if isinstance(self.client, HttpClient):
f = self.client.execute
for setting, value in self.session_settings.items():
stmt = f'set {setting}={value}'
statements = itertools.repeat((stmt, ()), self.concurrency)
aio.run_many(f, statements, self.concurrency, self.concurrency)

def warmup(self, stmt, num_warmup, concurrency=0, args=None):
statements = itertools.repeat((stmt, args or ()), num_warmup)
Expand Down
1 change: 1 addition & 0 deletions cr8/run_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def run_queries(self, queries: Iterable[dict], meta=None, session_settings=None)
f' {mode_desc}: {duration or iterations}')
)
with Runner(self.benchmark_hosts, concurrency, self.sample_mode, session_settings) as runner:
runner.set_session_settings()
if warmup > 0:
runner.warmup(stmt, warmup, concurrency, args)
timed_stats = runner.run(
Expand Down

0 comments on commit 2d008ae

Please sign in to comment.