Skip to content

Commit

Permalink
Merge pull request #221 from runpod/json-logs
Browse files Browse the repository at this point in the history
Json logs
  • Loading branch information
justinmerrell authored Nov 14, 2023
2 parents 69fe168 + b6039a5 commit c22b4c8
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 46 deletions.
8 changes: 4 additions & 4 deletions examples/serverless/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
# ERROR | An error message


log.debug('A debug message', job_id=JOB_ID)
log.info('An info message', job_id=JOB_ID)
log.warn('A warning message', job_id=JOB_ID)
log.error('An error message', job_id=JOB_ID)
log.debug('A debug message', request_id=JOB_ID)
log.info('An info message', request_id=JOB_ID)
log.warn('A warning message', request_id=JOB_ID)
log.error('An error message', request_id=JOB_ID)

# Output:
# {"requestId": "1234567890", "message": "A debug message", "level": "DEBUG"}
Expand Down
18 changes: 9 additions & 9 deletions runpod/serverless/modules/rp_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
continue

next_job = await response.json()
log.debug(f"Received Job | {next_job}")
log.debug(f"Request Received | {next_job}")

# Check if the job is valid
job_id = next_job.get("id", None)
Expand All @@ -94,11 +94,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
if not retry:
return None

log.debug(f"{next_job['id']} | Valid Job Confirmed")
log.debug("Confirmed valid request.", next_job['id'])

if next_job:
job_list.add_job(next_job["id"])
log.debug(f"{next_job['id']} | Added Job ID")
log.debug("Request ID added.", next_job['id'])

return next_job

Expand All @@ -108,14 +108,14 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
Run the job using the handler.
Returns the job output or error.
"""
log.info(f'{job["id"]} | Started')
log.info('Started', job["id"])
run_result = {"error": "No output from handler."}

try:
handler_return = handler(job)
job_output = await handler_return if inspect.isawaitable(handler_return) else handler_return

log.debug(f'{job["id"]} | Handler output: {job_output}')
log.debug(f'Handler output: {job_output}', job["id"])

if isinstance(job_output, dict):
error_msg = job_output.pop("error", None)
Expand Down Expand Up @@ -149,12 +149,12 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
"runpod_version": runpod_version
}

log.error(f'{job["id"]} | Captured Handler Exception')
log.error('Captured Handler Exception', job["id"])
log.error(json.dumps(error_info, indent=4))
run_result = {"error": json.dumps(error_info)}

finally:
log.debug(f'{job["id"]} | run_job return: {run_result}')
log.debug(f'run_job return: {run_result}', job["id"])

return run_result

Expand All @@ -175,7 +175,7 @@ async def run_job_generator(
for output_partial in job_output:
yield {"output": output_partial}
except Exception as err: # pylint: disable=broad-except
log.error(f'Error while running job {job["id"]}: {err}')
log.error(err, job["id"])
yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"}
finally:
log.info(f'{job["id"]} | Finished ')
log.info('Finished', job["id"])
21 changes: 12 additions & 9 deletions runpod/serverless/modules/rp_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def log(self, message, message_level='INFO', job_id=None):
if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP':
return

if job_id:
if os.environ.get('RUNPOD_ENDPOINT_ID'):
log_json = {
'requestId': job_id,
'message': message,
Expand All @@ -80,6 +80,9 @@ def log(self, message, message_level='INFO', job_id=None):
print(json.dumps(log_json), flush=True)
return

if job_id:
message = f'{job_id} | {message}'

print(f'{message_level.ljust(7)}| {message}', flush=True)
return

Expand All @@ -92,29 +95,29 @@ def secret(self, secret_name, secret):
redacted_secret = secret[0] + '*' * (len(secret)-2) + secret[-1]
self.info(f"{secret_name}: {redacted_secret}")

def debug(self, message, job_id=None):
def debug(self, message, request_id=None):
'''
debug log
'''
self.log(message, 'DEBUG', job_id)
self.log(message, 'DEBUG', request_id)

def info(self, message, job_id=None):
def info(self, message, request_id=None):
'''
info log
'''
self.log(message, 'INFO', job_id)
self.log(message, 'INFO', request_id)

def warn(self, message, job_id=None):
def warn(self, message, request_id=None):
'''
warn log
'''
self.log(message, 'WARN', job_id)
self.log(message, 'WARN', request_id)

def error(self, message, job_id=None):
def error(self, message, request_id=None):
'''
error log
'''
self.log(message, 'ERROR', job_id)
self.log(message, 'ERROR', request_id)

def tip(self, message):
'''
Expand Down
2 changes: 1 addition & 1 deletion runpod/serverless/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def _process_job(job, session, job_scaler, config):

# If refresh_worker is set, pod will be reset after job is complete.
if config.get("refresh_worker", False):
log.info(f"refresh_worker | Flag set, stopping pod after job {job['id']}.")
log.info("refresh_worker flag set, stopping pod after job.", job['id'])
job_result["stopPod"] = True
job_scaler.kill_worker()

Expand Down
43 changes: 20 additions & 23 deletions tests/test_serverless/test_modules/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from runpod.serverless.modules import rp_job


class TestJob(IsolatedAsyncioTestCase):
''' Tests the Job class. '''

Expand Down Expand Up @@ -38,19 +39,18 @@ async def test_get_job_200(self):
response4.json = make_mocked_coro(return_value={"id": "123", "input": {"number": 1}})

with patch("aiohttp.ClientSession") as mock_session, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

# Set side_effect to a list of mock responses
mock_session.get.return_value.__aenter__.side_effect = [
response1, response2, response3, response4
]
]

job = await rp_job.get_job(mock_session, retry=True)

# Assertions for the success case
assert job == {"id": "123", "input": {"number": 1}}


async def test_get_job_204(self):
'''
Tests the get_job function with a 204 response
Expand All @@ -61,7 +61,7 @@ async def test_get_job_204(self):
response_204.json = make_mocked_coro(return_value=None)

with patch("aiohttp.ClientSession") as mock_session_204, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_204.get.return_value.__aenter__.return_value = response_204
job = await rp_job.get_job(mock_session_204, retry=False)
Expand All @@ -78,14 +78,13 @@ async def test_get_job_400(self):
response_400.status = 400

with patch("aiohttp.ClientSession") as mock_session_400, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_400.get.return_value.__aenter__.return_value = response_400
job = await rp_job.get_job(mock_session_400, retry=False)

assert job is None


async def test_get_job_500(self):
'''
Tests the get_job function with a 500 response
Expand All @@ -95,14 +94,13 @@ async def test_get_job_500(self):
response_500.status = 500

with patch("aiohttp.ClientSession") as mock_session_500, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_500.get.return_value.__aenter__.return_value = response_500
job = await rp_job.get_job(mock_session_500, retry=False)

assert job is None


async def test_get_job_no_id(self):
'''
Tests the get_job function with a 200 response but no id
Expand All @@ -111,10 +109,9 @@ async def test_get_job_no_id(self):
response.status = 200
response.json = make_mocked_coro(return_value={})


with patch("aiohttp.ClientSession") as mock_session, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session.get.return_value.__aenter__.return_value = response

Expand All @@ -131,10 +128,9 @@ async def test_get_job_no_input(self):
response.status = 200
response.json = make_mocked_coro(return_value={"id": "123"})


with patch("aiohttp.ClientSession") as mock_session, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session.get.return_value.__aenter__.return_value = response

Expand All @@ -152,15 +148,16 @@ async def test_get_job_exception(self):
response_exception.status = 200

with patch("aiohttp.ClientSession") as mock_session_exception, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_exception.get.return_value.__aenter__.side_effect = Exception
job = await rp_job.get_job(mock_session_exception, retry=False)

assert job is None
assert mock_log.error.call_count == 1


class TestRunJob(IsolatedAsyncioTestCase):
''' Tests the run_job function '''

Expand All @@ -184,7 +181,7 @@ async def test_simple_job(self):

mock_handler.return_value = ['test1', 'test2']
job_result_list = await rp_job.run_job(mock_handler, self.sample_job)
assert job_result_list == {"output":["test1", "test2"]}
assert job_result_list == {"output": ["test1", "test2"]}

mock_handler.return_value = 123
job_result_int = await rp_job.run_job(mock_handler, self.sample_job)
Expand Down Expand Up @@ -249,14 +246,14 @@ async def test_job_with_exception(self):
class TestRunJobGenerator(IsolatedAsyncioTestCase):
''' Tests the run_job_generator function '''

def handler_gen_success(self, job): # pylint: disable=unused-argument
def handler_gen_success(self, job): # pylint: disable=unused-argument
'''
Test handler that returns a generator.
'''
yield "partial_output_1"
yield "partial_output_2"

async def handler_async_gen_success(self, job): # pylint: disable=unused-argument
async def handler_async_gen_success(self, job): # pylint: disable=unused-argument
'''
Test handler that returns an async generator.
'''
Expand All @@ -267,7 +264,7 @@ def handler_fail(self, job):
'''
Test handler that raises an exception.
'''
raise Exception("Test Exception") # pylint: disable=broad-exception-raised
raise Exception("Test Exception") # pylint: disable=broad-exception-raised

async def test_run_job_generator_success(self):
'''
Expand All @@ -282,7 +279,7 @@ async def test_run_job_generator_success(self):
assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}]
assert mock_log.error.call_count == 0
assert mock_log.info.call_count == 1
mock_log.info.assert_called_with('123 | Finished ')
mock_log.info.assert_called_with('Finished', '123')

async def test_run_job_generator_success_async(self):
'''
Expand All @@ -297,7 +294,7 @@ async def test_run_job_generator_success_async(self):
assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}]
assert mock_log.error.call_count == 0
assert mock_log.info.call_count == 1
mock_log.info.assert_called_with('123 | Finished ')
mock_log.info.assert_called_with('Finished', '123')

async def test_run_job_generator_exception(self):
'''
Expand All @@ -313,4 +310,4 @@ async def test_run_job_generator_exception(self):
assert "error" in result[0]
assert mock_log.error.call_count == 1
assert mock_log.info.call_count == 1
mock_log.info.assert_called_with('123 | Finished ')
mock_log.info.assert_called_with('Finished', '123')

0 comments on commit c22b4c8

Please sign in to comment.