Skip to content

Commit

Permalink
Users/anksing/event closed bug (Azure#34495)
Browse files Browse the repository at this point in the history
* Adding tests to capture groundedness with expected values

* Using threadpool to calculate prompt metrics instead of ayncio

* Streaming logs for PF run so to show them on console
  • Loading branch information
singankit authored Feb 29, 2024
1 parent 35bf8c0 commit 6175879
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
import logging

from openai import AsyncAzureOpenAI
from openai import AsyncAzureOpenAI, AzureOpenAI
from openai.types.chat.chat_completion import ChatCompletion

from azure.ai.generative.evaluate._user_agent import USER_AGENT
Expand All @@ -25,8 +25,8 @@ def __init__(self, openai_params):
self._azure_deployment = openai_params.get("azure_deployment", None)\
if openai_params.get("azure_deployment", None) else openai_params.get("deployment_id", None)

self._client = AsyncAzureOpenAI(
azure_endpoint=self._azure_endpoint,
self._client = AzureOpenAI(
azure_endpoint=self._azure_endpoint.strip("/"),
api_version=self._api_version,
api_key=self._api_key,
default_headers={
Expand All @@ -35,19 +35,18 @@ def __init__(self, openai_params):
},
)

async def bounded_chat_completion(self, messages):
async with semaphore:
try:
result = await self._client.with_options(max_retries=5).chat.completions.create(
model=self._azure_deployment,
messages=messages,
temperature=0,
seed=0,
)
return result
except Exception as ex:
LOGGER.debug(f"Failed to call llm with exception : {str(ex)}")
return ex
def bounded_chat_completion(self, messages):
try:
result = self._client.with_options(max_retries=5).chat.completions.create(
model=self._azure_deployment,
messages=messages,
temperature=0,
seed=0,
)
return result
except Exception as ex:
LOGGER.debug(f"Failed to call llm with exception : {str(ex)}")
return ex

@staticmethod
def get_chat_completion_content_from_response(response):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import asyncio
import json
from json import JSONDecodeError

import numpy as np
import pandas as pd
import logging
import tqdm.asyncio
from numpy import NaN

from .._client.openai_client import AzureOpenAIClient
from .._metric_handler import MetricHandler
from ..metrics._custom_metric import PromptMetric
from ..metrics._parsers import JsonParser, NumberParser
from concurrent.futures.thread import ThreadPoolExecutor
import tqdm

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,49 +104,100 @@ def _parser_response(self, value, metric):

return result

async def _compute_metric_row(self, metric, data):
def _compute_metric_row(self, metric, data):
message = self._convert_metric_to_message(metric, data)
response = await self._client.bounded_chat_completion(message)
response = self._client.bounded_chat_completion(message)
content = self._client.get_chat_completion_content_from_response(response)
result = self._parser_response(content if content is not None else response, metric)
return result

async def _compute_metric(self, metric):
def _compute_metric(self, metric):
data = self._get_data_for_metric(metric)
tasks = []
for row_data in data:
task = asyncio.ensure_future(
self._compute_metric_row(metric, row_data)
)
tasks.append(task)

responses = await asyncio.gather(*tasks, return_exceptions=True)
results = {"artifacts": {}, "metrics": {}}
for key in responses[0].keys():
results["artifacts"].update({
key: [row[key] for row in responses]
})

return results
row_metric_futures = []
row_metric_results = []

async def _compute_metrics(self, metrics):
tasks = []
metrics_dict = {"artifacts": {}, "metrics": {}}
for metric in self.metrics:
task = asyncio.ensure_future(
self._compute_metric(metric)
)
tasks.append(task)
with ThreadPoolExecutor(thread_name_prefix="code_metrics_row") as thread_pool:
for i in range(0, len(data)):
row_metric_futures.append(thread_pool.submit(
self._compute_metric_row, metric, data=data[i]
))

for row_metric_future in row_metric_futures:
row_metric_results.append(row_metric_future.result())

results = {"artifacts": {}, "metrics": {}}

if isinstance(row_metric_results[0], dict):
for key in row_metric_results[0].keys():
results["artifacts"].update({
key: [row[key] for row in row_metric_results]
})
else:
results["artifacts"].update(
{metric.name: row_metric_results}
)

# tasks = []
# for row_data in data:
# task = asyncio.ensure_future(
# self._compute_metric_row(metric, row_data)
# )
# tasks.append(task)

# responses = await asyncio.gather(*tasks, return_exceptions=True)
responses = await tqdm.asyncio.tqdm.gather(*tasks)
for response in responses:
for k, v in metrics_dict.items():
v.update(response[k])
# results = {"artifacts": {}, "metrics": {}}
# for key in responses[0].keys():
# results["artifacts"].update({
# key: [row[key] for row in responses]
# })

return results

def _compute_metrics(self, metrics):

metrics_dict = {"artifacts": {}, "metrics": {}}
metric_results_futures = {}
with tqdm.tqdm(total=len(metrics)) as progress_bar:
with ThreadPoolExecutor(thread_name_prefix="prompt_metrics") as thread_pool:
for metric in self.metrics:
metric_values = []
metric_results_futures.update({metric.name: thread_pool.submit(
self._compute_metric, metric,
)})

for metric_name, metric_result_future in metric_results_futures.items():
try:
metric_result = metric_result_future.result()
metrics_dict["artifacts"].update(metric_result["artifacts"])
if "metrics" in metric_result.keys() and metric_result["metrics"] is not None:
metrics_dict["metrics"].update(metric_result["metrics"])
progress_bar.update(1)
except Exception as ex:
progress_bar.update(1)
LOGGER.info(
f"Error calculating value for {metric_name}, failed with error {str(ex)} : Stack trace : {str(ex.__traceback__)}")


return metrics_dict

# tasks = []
# metrics_dict = {"artifacts": {}, "metrics": {}}
# for metric in self.metrics:
# task = asyncio.ensure_future(
# self._compute_metric(metric)
# )
# tasks.append(task)

# # responses = await asyncio.gather(*tasks, return_exceptions=True)
# responses = await tqdm.asyncio.tqdm.gather(*tasks)
# for response in responses:
# for k, v in metrics_dict.items():
# v.update(response[k])

# return metrics_dict

def calculate_metrics(self):
LOGGER.info(f"Calculating prompt metric {[metric.name for metric in self.metrics]}")
result = asyncio.run(self._compute_metrics(self.metrics), debug=True)
result = self._compute_metrics(self.metrics)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def run_pf_flow_with_dict_list(flow_path, data: List[Dict], flow_params=None):
data=tmp_path,
column_mapping=column_mapping,
environment_variables=env_vars,
stream=True,
**flow_params
)

Expand Down

0 comments on commit 6175879

Please sign in to comment.