Skip to content

Improve openai class #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 87 additions & 38 deletions clarifai/runners/models/dummy_openai_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Dummy OpenAI model implementation for testing."""

import json
from typing import Iterator
from typing import Any, Dict, Iterator

from clarifai.runners.models.openai_class import OpenAIModelClass

Expand Down Expand Up @@ -51,6 +51,23 @@ def __init__(self, messages):
self.created = 1234567890
self.model = "dummy-model"

def to_dict(self) -> Dict[str, Any]:
"""Convert the completion object to a dictionary."""
return {
"id": self.id,
"created": self.created,
"model": self.model,
"choices": [
{
"message": {"role": choice.message.role, "content": choice.message.content},
"finish_reason": choice.finish_reason,
"index": choice.index,
}
for choice in self.choices
],
"usage": self.usage,
}


class MockCompletionStream:
"""Mock completion stream that mimics the OpenAI streaming response structure."""
Expand All @@ -62,82 +79,114 @@ def __init__(self, content=None):
self.content = content
self.role = "assistant" if content is None else None

def __init__(self, content=None):
def __init__(self, content=None, include_usage=False):
self.delta = self.Delta(content)
self.finish_reason = None if content else "stop"
self.index = 0

def __init__(self, content=None):
self.choices = [self.Choice(content)]
self.usage = (
{"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
if include_usage
else None
)

def __init__(self, content=None, include_usage=False):
self.choices = [self.Choice(content, include_usage)]
self.id = "dummy-chunk-id"
self.created = 1234567890
self.model = "dummy-model"
self.usage = self.choices[0].usage

def to_dict(self) -> Dict[str, Any]:
"""Convert the chunk to a dictionary."""
result = {
"id": self.id,
"created": self.created,
"model": self.model,
"choices": [
{
"delta": {"role": choice.delta.role, "content": choice.delta.content}
if choice.delta.content is not None
else {"role": choice.delta.role},
"finish_reason": choice.finish_reason,
"index": choice.index,
}
for choice in self.choices
],
}
if self.usage:
result["usage"] = self.usage
return result

def __init__(self, messages):
# Generate a simple response based on the last message
last_message = messages[-1] if messages else {"content": ""}
self.response_text = f"Echo: {last_message.get('content', '')}"
# Divide the response into chunks of 5 characters
self.chunks = [self.response_text[i : i + 5] for i in range(0, len(self.response_text), 5)]
# Create chunks that ensure the full text is included in the first chunk
self.chunks = [
self.response_text, # First chunk contains the full text
"", # Final chunk is empty to indicate completion
]
self.current_chunk = 0
self.include_usage = False

def __iter__(self):
return self

def __next__(self):
if self.current_chunk < len(self.chunks):
chunk = self.Chunk(self.chunks[self.current_chunk])
chunk = self.Chunk(self.chunks[self.current_chunk], self.include_usage)
self.current_chunk += 1
return chunk
elif self.current_chunk == len(self.chunks):
# Final chunk with empty content to indicate completion
self.current_chunk += 1
return self.Chunk()
else:
raise StopIteration


class DummyOpenAIModel(OpenAIModelClass):
"""Dummy OpenAI model implementation for testing."""

def get_openai_client(self):
"""Return a mock OpenAI client."""
return MockOpenAIClient()
openai_client = MockOpenAIClient()
model_name = "dummy-model"

def _process_request(self, model, messages, temperature=1.0, max_tokens=None):
def _process_request(self, **kwargs) -> Dict[str, Any]:
"""Process a request for non-streaming responses."""
# Simply return the text of the last message as our "response"
last_message = messages[-1] if messages else {"content": ""}
return f"Echo: {last_message.get('content', '')}"
completion_args = self._create_completion_args(kwargs)
return self.client.chat.completions.create(**completion_args).to_dict()

def _process_streaming_request(self, model, messages, temperature=1.0, max_tokens=None):
def _process_streaming_request(self, **kwargs) -> Iterator[Dict[str, Any]]:
"""Process a request for streaming responses."""
# Generate a simple text response
last_message = messages[-1] if messages else {"content": ""}
response_text = f"Echo: {last_message.get('content', '')}"
completion_args = self._create_completion_args(kwargs, stream=True)
completion_stream = self.client.chat.completions.create(**completion_args)
completion_stream.include_usage = kwargs.get('stream_options', {}).get(
'include_usage', False
)

# Yield chunks of the response
for i in range(0, len(response_text), 5):
yield response_text[i : i + 5]
for chunk in completion_stream:
yield chunk.to_dict()

# Override the method directly for testing
@OpenAIModelClass.method
def openai_stream_transport(self, req: str) -> Iterator[str]:
"""Direct implementation for testing purposes."""
try:
request_data = json.loads(req)
messages = request_data.get("messages", [])

# Validate the request format for testing
if not messages or not isinstance(messages[0], dict) or "content" not in messages[0]:
raise ValueError("Invalid message format")

last_message = messages[-1] if messages else {"content": ""}
response_text = f"Echo: {last_message.get('content', '')}"

chunks = [response_text[i : i + 5] for i in range(0, len(response_text), 5)]
for chunk in chunks:
yield chunk
params = self._extract_request_params(request_data)

# Validate messages
if not params.get("messages"):
yield "Error: No messages provided"
return

for message in params["messages"]:
if (
not isinstance(message, dict)
or "role" not in message
or "content" not in message
):
yield "Error: Invalid message format"
return

for chunk in self._process_streaming_request(**params):
yield json.dumps(chunk)
except Exception as e:
yield f"Error: {str(e)}"

Expand Down
Loading