Skip to content

Commit

Permalink
Adding the non adversarial simulator (#37350)
Browse files Browse the repository at this point in the history
* Adding the non adversarial simulator

* Remove promptflow.evals

* Fixed the import bug

* Fixed the parse response error

* Fixed the output to be in json format

* Fix progress bar

* remove pdb

* sphinx please be happy with this

* Move application calling logic to a separate function

* Synthetic to simulator

* Removed old init

* Add unittests

* Docstring and warning message

* First turn through the simulator

* Fixes to test and user flow

* More checks and changes to messages

* Fix the progress bar

---------

Co-authored-by: Nagkumar Arkalgud <nagkumar@naarkalg-work-mac.local>
Co-authored-by: Nagkumar Arkalgud <nagkumar@naarkalgworkmac.lan>
Co-authored-by: Nagkumar Arkalgud <nagkumar@Mac.lan>
  • Loading branch information
4 people authored Sep 20, 2024
1 parent b9824f7 commit a9233f4
Show file tree
Hide file tree
Showing 11 changed files with 1,185 additions and 34 deletions.
126 changes: 126 additions & 0 deletions sdk/evaluation/azure-ai-evaluation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,132 @@ if __name__ == "__main__":

pprint(result)
```
## Simulator

Sample application prompty

```yaml
---
name: ApplicationPrompty
description: Simulates an application
model:
api: chat
configuration:
type: azure_openai
azure_deployment: ${env:AZURE_DEPLOYMENT}
api_key: ${env:AZURE_OPENAI_API_KEY}
azure_endpoint: ${env:AZURE_OPENAI_ENDPOINT}
parameters:
temperature: 0.0
top_p: 1.0
presence_penalty: 0
frequency_penalty: 0
response_format:
type: text

inputs:
conversation_history:
type: dict

---
system:
You are a helpful assistant and you're helping with the user's query. Keep the conversation engaging and interesting.

Output with a string that continues the conversation, responding to the latest message from the user, given the conversation history:
{{ conversation_history }}

```
Application code:

```python
import json
import asyncio
from typing import Any, Dict, List, Optional
from azure.ai.evaluation.synthetic import Simulator
from promptflow.client import load_flow
from azure.identity import DefaultAzureCredential
import os

azure_ai_project = {
"subscription_id": os.environ.get("AZURE_SUBSCRIPTION_ID"),
"resource_group_name": os.environ.get("RESOURCE_GROUP"),
"project_name": os.environ.get("PROJECT_NAME"),
"credential": DefaultAzureCredential(),
}

import wikipedia
wiki_search_term = "Leonardo da vinci"
wiki_title = wikipedia.search(wiki_search_term)[0]
wiki_page = wikipedia.page(wiki_title)
text = wiki_page.summary[:1000]

def method_to_invoke_application_prompty(query: str):
try:
current_dir = os.path.dirname(__file__)
prompty_path = os.path.join(current_dir, "application.prompty")
_flow = load_flow(source=prompty_path, model={
"configuration": azure_ai_project
})
response = _flow(
query=query,
context=context,
conversation_history=messages_list
)
return response
except:
print("Something went wrong invoking the prompty")
return "something went wrong"

async def callback(
messages: List[Dict],
stream: bool = False,
session_state: Any = None, # noqa: ANN401
context: Optional[Dict[str, Any]] = None,
) -> dict:
messages_list = messages["messages"]
# get last message
latest_message = messages_list[-1]
query = latest_message["content"]
context = None
# call your endpoint or ai application here
response = method_to_invoke_application_prompty(query)
# we are formatting the response to follow the openAI chat protocol format
formatted_response = {
"content": response,
"role": "assistant",
"context": {
"citations": None,
},
}
messages["messages"].append(formatted_response)
return {"messages": messages["messages"], "stream": stream, "session_state": session_state, "context": context}



async def main():
simulator = Simulator(azure_ai_project=azure_ai_project, credential=DefaultAzureCredential())
outputs = await simulator(
target=callback,
text=text,
num_queries=2,
max_conversation_turns=4,
user_persona=[
f"I am a student and I want to learn more about {wiki_search_term}",
f"I am a teacher and I want to teach my students about {wiki_search_term}"
],
)
print(json.dumps(outputs))

if __name__ == "__main__":
os.environ["AZURE_SUBSCRIPTION_ID"] = ""
os.environ["RESOURCE_GROUP"] = ""
os.environ["PROJECT_NAME"] = ""
os.environ["AZURE_OPENAI_API_KEY"] = ""
os.environ["AZURE_OPENAI_ENDPOINT"] = ""
os.environ["AZURE_DEPLOYMENT"] = ""
asyncio.run(main())
print("done!")
```

Simulators allow users to generate synthentic data using their application. Simulator expects the user to have a callback method that invokes
their AI application. Here's a sample of a callback which invokes AsyncAzureOpenAI:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from ._constants import SupportedLanguages
from ._direct_attack_simulator import DirectAttackSimulator
from ._indirect_attack_simulator import IndirectAttackSimulator
from .simulator import Simulator

__all__ = [
"AdversarialSimulator",
"AdversarialScenario",
"DirectAttackSimulator",
"IndirectAttackSimulator",
"SupportedLanguages",
"Simulator",
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# noqa: E501
# pylint: disable=E0401,E0611
import asyncio
import functools
import logging
import random
from typing import Any, Callable, Dict, List, Optional
Expand All @@ -13,7 +12,6 @@
from azure.identity import DefaultAzureCredential
from tqdm import tqdm

from promptflow._sdk._telemetry import ActivityType, monitor_operation
from azure.ai.evaluation._http_utils import get_async_http_client
from azure.ai.evaluation._exceptions import EvaluationException, ErrorBlame, ErrorCategory, ErrorTarget
from azure.ai.evaluation._model_configurations import AzureAIProject
Expand All @@ -29,43 +27,13 @@
RAIClient,
TokenScope,
)
from ._tracing import monitor_adversarial_scenario
from ._utils import JsonLineList
from ._constants import SupportedLanguages

logger = logging.getLogger(__name__)


def monitor_adversarial_scenario(func) -> Callable:
"""Monitor an adversarial scenario with logging
:param func: The function to be monitored
:type func: Callable
:return: The decorated function
:rtype: Callable
"""

@functools.wraps(func)
def wrapper(*args, **kwargs):
scenario = str(kwargs.get("scenario", None))
max_conversation_turns = kwargs.get("max_conversation_turns", None)
max_simulation_results = kwargs.get("max_simulation_results", None)
selected_language = kwargs.get("language", SupportedLanguages.English)
decorated_func = monitor_operation(
activity_name="adversarial.simulator.call",
activity_type=ActivityType.PUBLICAPI,
custom_dimensions={
"scenario": scenario,
"max_conversation_turns": max_conversation_turns,
"max_simulation_results": max_simulation_results,
"selected_language": selected_language,
},
)(func)

return decorated_func(*args, **kwargs)

return wrapper


class AdversarialSimulator:
"""
Initializes the adversarial simulator with a project scope.
Expand Down Expand Up @@ -414,6 +382,7 @@ def _join_conversation_starter(self, parameters, to_join):
def call_sync(
self,
*,
scenario: AdversarialScenario,
max_conversation_turns: int,
max_simulation_results: int,
target: Callable,
Expand All @@ -423,6 +392,12 @@ def call_sync(
concurrent_async_task: int,
) -> List[Dict[str, Any]]:
"""Call the adversarial simulator synchronously.
:keyword scenario: Enum value specifying the adversarial scenario used for generating inputs.
example:
- :py:const:`azure.ai.evaluation.simulator.adversarial_scenario.AdversarialScenario.ADVERSARIAL_QA`
- :py:const:`azure.ai.evaluation.simulator.adversarial_scenario.AdversarialScenario.ADVERSARIAL_CONVERSATION`
:paramtype scenario: azure.ai.evaluation.simulator.adversarial_scenario.AdversarialScenario
:keyword max_conversation_turns: The maximum number of conversation turns to simulate.
:paramtype max_conversation_turns: int
Expand All @@ -448,6 +423,7 @@ def call_sync(
# Note: This approach might not be suitable in all contexts, especially with nested async calls
future = asyncio.ensure_future(
self(
scenario=scenario,
max_conversation_turns=max_conversation_turns,
max_simulation_results=max_simulation_results,
target=target,
Expand All @@ -462,6 +438,7 @@ def call_sync(
# If no event loop is running, use asyncio.run (Python 3.7+)
return asyncio.run(
self(
scenario=scenario,
max_conversation_turns=max_conversation_turns,
max_simulation_results=max_simulation_results,
target=target,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._simulator_data_classes import ConversationHistory, Turn
from ._language_suffix_mapping import SUPPORTED_LANGUAGES_MAPPING

__all__ = ["SUPPORTED_LANGUAGES_MAPPING"]
__all__ = ["ConversationHistory", "Turn", "SUPPORTED_LANGUAGES_MAPPING"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
# pylint: disable=C0103,C0114,C0116
from dataclasses import dataclass
from typing import Union

from azure.ai.evaluation.simulator._conversation.constants import ConversationRole


@dataclass
class Turn:
"""
Represents a conversation turn,
keeping track of the role, content,
and context of a turn in a conversation.
"""

role: Union[str, ConversationRole]
content: str
context: str = None

def to_dict(self):
"""
Convert the conversation turn to a dictionary.
Returns:
dict: A dictionary representation of the conversation turn.
"""
return {
"role": self.role.value if isinstance(self.role, ConversationRole) else self.role,
"content": self.content,
"context": self.context,
}

def __repr__(self):
"""
Return the string representation of the conversation turn.
Returns:
str: A string representation of the conversation turn.
"""
return f"Turn(role={self.role}, content={self.content})"


class ConversationHistory:
"""
Conversation history class to keep track of the conversation turns in a conversation.
"""

def __init__(self):
"""
Initializes the conversation history with an empty list of turns.
"""
self.history = []

def add_to_history(self, turn: Turn):
"""
Adds a turn to the conversation history.
Args:
turn (Turn): The conversation turn to add.
"""
self.history.append(turn)

def to_list(self):
"""
Converts the conversation history to a list of dictionaries.
Returns:
list: A list of dictionaries representing the conversation turns.
"""
return [turn.to_dict() for turn in self.history]

def get_length(self):
"""
Returns the length of the conversation.
Returns:
int: The number of turns in the conversation history.
"""
return len(self.history)

def __repr__(self):
"""
Returns the string representation of the conversation history.
Returns:
str: A string representation of the conversation history.
"""
for turn in self.history:
print(turn)
return ""
Loading

0 comments on commit a9233f4

Please sign in to comment.