From c3028375a4948f66b8f1364096276ba700eda84d Mon Sep 17 00:00:00 2001 From: OpenHTF Owners Date: Wed, 6 Nov 2024 12:01:00 -0800 Subject: [PATCH] Create and persist an authorized session object for MfgInspector. AuthorizedSession allows us to persist connections across multiple requests to the same host. This means the manufacturing inspector client will re-use existing connections from the pool which reduces overhead and latency. Requests are processed faster and sessions automatically handle keep-alive connections. Additionally, session related data (e.g. auth) is persisted which improves performance. Given our partial uploader implementation, this will be beneficial to both client and server. PiperOrigin-RevId: 693813461 --- openhtf/output/callbacks/mfg_inspector.py | 37 ++++++++++++++------- test/output/callbacks/mfg_inspector_test.py | 23 ++++++++++--- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/openhtf/output/callbacks/mfg_inspector.py b/openhtf/output/callbacks/mfg_inspector.py index 819aa2fbd..1815b8965 100644 --- a/openhtf/output/callbacks/mfg_inspector.py +++ b/openhtf/output/callbacks/mfg_inspector.py @@ -18,6 +18,7 @@ import logging import time import zlib +from typing import Optional from google.auth import credentials as credentials_lib from google.auth.transport import requests @@ -45,19 +46,18 @@ class InvalidTestRunError(Exception): def _send_mfg_inspector_request( envelope_data: bytes, - credentials: credentials_lib.Credentials, + authorized_session: requests.AuthorizedSession, destination_url: str, ) -> Dict[str, Any]: """Send upload http request. Intended to be run in retry loop.""" logging.info('Uploading result...') - with requests.AuthorizedSession(credentials) as authed_session: - response = authed_session.request( - 'POST', - destination_url, - data=envelope_data, - timeout=_MFG_INSPECTOR_UPLOAD_TIMEOUT, - ) + response = authorized_session.request( + 'POST', + destination_url, + data=envelope_data, + timeout=_MFG_INSPECTOR_UPLOAD_TIMEOUT, + ) try: result = response.json() @@ -94,6 +94,7 @@ def send_mfg_inspector_data( credentials: credentials_lib.Credentials, destination_url: str, payload_type: guzzle_pb2.PayloadType, + authorized_session: Optional[requests.AuthorizedSession] = None, ) -> Dict[str, Any]: """Upload MfgEvent to steam_engine.""" envelope = guzzle_pb2.TestRunEnvelope() # pytype: disable=module-attr # gen-stub-imports @@ -105,10 +106,13 @@ def send_mfg_inspector_data( envelope.payload_type = payload_type envelope_data = envelope.SerializeToString() + if authorized_session is None: + authorized_session = requests.AuthorizedSession(credentials) + for _ in range(5): try: result = _send_mfg_inspector_request( - envelope_data, credentials, destination_url + envelope_data, authorized_session, destination_url ) return result except UploadFailedError: @@ -181,8 +185,10 @@ def __init__(self, 'user_agent': 'OpenHTF Guzzle Upload Client', }, scopes=[self.SCOPE_CODE_URI]) + self.authorized_session = requests.AuthorizedSession(self.credentials) else: self.credentials = None + self.authorized_session = None self.upload_result = None @@ -260,11 +266,18 @@ def upload(self, payload_type=guzzle_pb2.COMPRESSED_TEST_RUN): if not self.credentials: raise RuntimeError('Must provide credentials to use upload callback.') + if self.authorized_session is None: + self.authorized_session = requests.AuthorizedSession(self.credentials) + def upload_callback(test_record_obj): proto = self._convert(test_record_obj) - self.upload_result = send_mfg_inspector_data(proto, self.credentials, - self.destination_url, - payload_type) + self.upload_result = send_mfg_inspector_data( + proto, + self.credentials, + self.destination_url, + payload_type, + self.authorized_session, + ) return upload_callback diff --git a/test/output/callbacks/mfg_inspector_test.py b/test/output/callbacks/mfg_inspector_test.py index c382cea8c..4f562fc20 100644 --- a/test/output/callbacks/mfg_inspector_test.py +++ b/test/output/callbacks/mfg_inspector_test.py @@ -55,6 +55,13 @@ def setUp(self): self.mock_send_mfg_inspector_data = mock.patch.object( mfg_inspector, 'send_mfg_inspector_data').start() + self.mock_authorized_session = mock.patch.object( + mfg_inspector.requests, + 'AuthorizedSession', + spec_set=True, + autospec=True, + ).start() + def tearDown(self): mock.patch.stopall() super(TestMfgInspector, self).tearDown() @@ -110,8 +117,12 @@ def test_upload_only(self): callback.upload()(MOCK_TEST_RUN) self.mock_send_mfg_inspector_data.assert_called_with( - MOCK_TEST_RUN_PROTO, self.mock_credentials, callback.destination_url, - guzzle_pb2.COMPRESSED_TEST_RUN) + MOCK_TEST_RUN_PROTO, + self.mock_credentials, + callback.destination_url, + guzzle_pb2.COMPRESSED_TEST_RUN, + self.mock_authorized_session(self.mock_credentials), + ) def test_save_and_upload(self): testrun_output = io.BytesIO() @@ -132,8 +143,12 @@ def test_save_and_upload(self): self.assertEqual(MOCK_TEST_RUN_PROTO, testrun) self.mock_send_mfg_inspector_data.assert_called_with( - MOCK_TEST_RUN_PROTO, self.mock_credentials, callback.destination_url, - guzzle_pb2.COMPRESSED_TEST_RUN) + MOCK_TEST_RUN_PROTO, + self.mock_credentials, + callback.destination_url, + guzzle_pb2.COMPRESSED_TEST_RUN, + self.mock_authorized_session(self.mock_credentials), + ) # Make sure mock converter only called once i.e. the test record was # was converted to a proto only once. This important because some custom