Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
399 changes: 201 additions & 198 deletions camel/model_backend.py
Original file line number Diff line number Diff line change
@@ -1,198 +1,201 @@
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
from abc import ABC, abstractmethod
from typing import Any, Dict

import openai
import tiktoken

from camel.typing import ModelType
from chatdev.statistics import prompt_cost
from chatdev.utils import log_visualize

try:
from openai.types.chat import ChatCompletion

openai_new_api = True # new openai api version
except ImportError:
openai_new_api = False # old openai api version

import os

OPENAI_API_KEY = os.environ['OPENAI_API_KEY']
if 'BASE_URL' in os.environ:
BASE_URL = os.environ['BASE_URL']
else:
BASE_URL = None


class ModelBackend(ABC):
r"""Base class for different model backends.
May be OpenAI API, a local LLM, a stub for unit tests, etc."""

@abstractmethod
def run(self, *args, **kwargs):
r"""Runs the query to the backend model.

Raises:
RuntimeError: if the return value from OpenAI API
is not a dict that is expected.

Returns:
Dict[str, Any]: All backends must return a dict in OpenAI format.
"""
pass


class OpenAIModel(ModelBackend):
r"""OpenAI API in a unified ModelBackend interface."""

def __init__(self, model_type: ModelType, model_config_dict: Dict) -> None:
super().__init__()
self.model_type = model_type
self.model_config_dict = model_config_dict

def run(self, *args, **kwargs):
string = "\n".join([message["content"] for message in kwargs["messages"]])
encoding = tiktoken.encoding_for_model(self.model_type.value)
num_prompt_tokens = len(encoding.encode(string))
gap_between_send_receive = 15 * len(kwargs["messages"])
num_prompt_tokens += gap_between_send_receive

if openai_new_api:
# Experimental, add base_url
if BASE_URL:
client = openai.OpenAI(
api_key=OPENAI_API_KEY,
base_url=BASE_URL,
)
else:
client = openai.OpenAI(
api_key=OPENAI_API_KEY
)

num_max_token_map = {
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-16k": 16384,
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k-0613": 16384,
"gpt-4": 8192,
"gpt-4-0613": 8192,
"gpt-4-32k": 32768,
"gpt-4-turbo": 100000,
}
num_max_token = num_max_token_map[self.model_type.value]
num_max_completion_tokens = num_max_token - num_prompt_tokens
self.model_config_dict['max_tokens'] = num_max_completion_tokens

response = client.chat.completions.create(*args, **kwargs, model=self.model_type.value,
**self.model_config_dict)

cost = prompt_cost(
self.model_type.value,
num_prompt_tokens=response.usage.prompt_tokens,
num_completion_tokens=response.usage.completion_tokens
)

log_visualize(
"**[OpenAI_Usage_Info Receive]**\nprompt_tokens: {}\ncompletion_tokens: {}\ntotal_tokens: {}\ncost: ${:.6f}\n".format(
response.usage.prompt_tokens, response.usage.completion_tokens,
response.usage.total_tokens, cost))
if not isinstance(response, ChatCompletion):
raise RuntimeError("Unexpected return from OpenAI API")
return response
else:
num_max_token_map = {
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-16k": 16384,
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k-0613": 16384,
"gpt-4": 8192,
"gpt-4-0613": 8192,
"gpt-4-32k": 32768,
"gpt-4-turbo": 100000,
}
num_max_token = num_max_token_map[self.model_type.value]
num_max_completion_tokens = num_max_token - num_prompt_tokens
self.model_config_dict['max_tokens'] = num_max_completion_tokens

response = openai.ChatCompletion.create(*args, **kwargs, model=self.model_type.value,
**self.model_config_dict)

cost = prompt_cost(
self.model_type.value,
num_prompt_tokens=response["usage"]["prompt_tokens"],
num_completion_tokens=response["usage"]["completion_tokens"]
)

log_visualize(
"**[OpenAI_Usage_Info Receive]**\nprompt_tokens: {}\ncompletion_tokens: {}\ntotal_tokens: {}\ncost: ${:.6f}\n".format(
response["usage"]["prompt_tokens"], response["usage"]["completion_tokens"],
response["usage"]["total_tokens"], cost))
if not isinstance(response, Dict):
raise RuntimeError("Unexpected return from OpenAI API")
return response


class StubModel(ModelBackend):
r"""A dummy model used for unit tests."""

def __init__(self, *args, **kwargs) -> None:
super().__init__()

def run(self, *args, **kwargs) -> Dict[str, Any]:
ARBITRARY_STRING = "Lorem Ipsum"

return dict(
id="stub_model_id",
usage=dict(),
choices=[
dict(finish_reason="stop",
message=dict(content=ARBITRARY_STRING, role="assistant"))
],
)


class ModelFactory:
r"""Factory of backend models.

Raises:
ValueError: in case the provided model type is unknown.
"""

@staticmethod
def create(model_type: ModelType, model_config_dict: Dict) -> ModelBackend:
default_model_type = ModelType.GPT_3_5_TURBO

if model_type in {
ModelType.GPT_3_5_TURBO,
ModelType.GPT_3_5_TURBO_NEW,
ModelType.GPT_4,
ModelType.GPT_4_32k,
ModelType.GPT_4_TURBO,
ModelType.GPT_4_TURBO_V,
None
}:
model_class = OpenAIModel
elif model_type == ModelType.STUB:
model_class = StubModel
else:
raise ValueError("Unknown model")

if model_type is None:
model_type = default_model_type

# log_visualize("Model Type: {}".format(model_type))
inst = model_class(model_type, model_config_dict)
return inst
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
from abc import ABC, abstractmethod
from typing import Any, Dict

import openai
import tiktoken

from camel.typing import ModelType
from chatdev.statistics import prompt_cost
from chatdev.utils import log_visualize

try:
from openai.types.chat import ChatCompletion

openai_new_api = True # new openai api version
except ImportError:
openai_new_api = False # old openai api version

import os

OPENAI_API_KEY = os.environ['OPENAI_API_KEY']
if 'BASE_URL' in os.environ:
BASE_URL = os.environ['BASE_URL']
else:
BASE_URL = None


class ModelBackend(ABC):
r"""Base class for different model backends.
May be OpenAI API, a local LLM, a stub for unit tests, etc."""

@abstractmethod
def run(self, *args, **kwargs):
r"""Runs the query to the backend model.

Raises:
RuntimeError: if the return value from OpenAI API
is not a dict that is expected.

Returns:
Dict[str, Any]: All backends must return a dict in OpenAI format.
"""
pass


class OpenAIModel(ModelBackend):
r"""OpenAI API in a unified ModelBackend interface."""

def __init__(self, model_type: ModelType, model_config_dict: Dict) -> None:
super().__init__()
self.model_type = model_type
self.model_config_dict = model_config_dict

def run(self, *args, **kwargs):
string = "\n".join([message["content"] for message in kwargs["messages"]])
encoding = tiktoken.encoding_for_model(self.model_type.value)
num_prompt_tokens = len(encoding.encode(string))
gap_between_send_receive = 15 * len(kwargs["messages"])
num_prompt_tokens += gap_between_send_receive

if openai_new_api:
# Experimental, add base_url
if BASE_URL:
client = openai.OpenAI(
api_key=OPENAI_API_KEY,
base_url=BASE_URL,
)
else:
client = openai.OpenAI(
api_key=OPENAI_API_KEY
)

num_max_token_map = {
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-16k": 16384,
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k-0613": 16384,
"gpt-4": 8192,
"gpt-4-0613": 8192,
"gpt-4-32k": 32768,
"gpt-4-1106-preview": 4096,
"gpt-4-1106-vision-preview": 4096,
"gpt-4o": 4096,
}
num_max_token = num_max_token_map[self.model_type.value]
num_max_completion_tokens = num_max_token - num_prompt_tokens
self.model_config_dict['max_tokens'] = num_max_completion_tokens

response = client.chat.completions.create(*args, **kwargs, model=self.model_type.value,
**self.model_config_dict)

cost = prompt_cost(
self.model_type.value,
num_prompt_tokens=response.usage.prompt_tokens,
num_completion_tokens=response.usage.completion_tokens
)

log_visualize(
"**[OpenAI_Usage_Info Receive]**\nprompt_tokens: {}\ncompletion_tokens: {}\ntotal_tokens: {}\ncost: ${:.6f}\n".format(
response.usage.prompt_tokens, response.usage.completion_tokens,
response.usage.total_tokens, cost))
if not isinstance(response, ChatCompletion):
raise RuntimeError("Unexpected return from OpenAI API")
return response
else:
num_max_token_map = {
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-16k": 16384,
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k-0613": 16384,
"gpt-4": 8192,
"gpt-4-0613": 8192,
"gpt-4-32k": 32768,
"gpt-4o": 4096,
}
num_max_token = num_max_token_map[self.model_type.value]
num_max_completion_tokens = num_max_token - num_prompt_tokens
self.model_config_dict['max_tokens'] = num_max_completion_tokens

response = openai.ChatCompletion.create(*args, **kwargs, model=self.model_type.value,
**self.model_config_dict)

cost = prompt_cost(
self.model_type.value,
num_prompt_tokens=response["usage"]["prompt_tokens"],
num_completion_tokens=response["usage"]["completion_tokens"]
)

log_visualize(
"**[OpenAI_Usage_Info Receive]**\nprompt_tokens: {}\ncompletion_tokens: {}\ntotal_tokens: {}\ncost: ${:.6f}\n".format(
response["usage"]["prompt_tokens"], response["usage"]["completion_tokens"],
response["usage"]["total_tokens"], cost))
if not isinstance(response, Dict):
raise RuntimeError("Unexpected return from OpenAI API")
return response


class StubModel(ModelBackend):
r"""A dummy model used for unit tests."""

def __init__(self, *args, **kwargs) -> None:
super().__init__()

def run(self, *args, **kwargs) -> Dict[str, Any]:
ARBITRARY_STRING = "Lorem Ipsum"

return dict(
id="stub_model_id",
usage=dict(),
choices=[
dict(finish_reason="stop",
message=dict(content=ARBITRARY_STRING, role="assistant"))
],
)


class ModelFactory:
r"""Factory of backend models.

Raises:
ValueError: in case the provided model type is unknown.
"""

@staticmethod
def create(model_type: ModelType, model_config_dict: Dict) -> ModelBackend:
default_model_type = ModelType.GPT_3_5_TURBO

if model_type in {
ModelType.GPT_3_5_TURBO,
ModelType.GPT_3_5_TURBO_NEW,
ModelType.GPT_4,
ModelType.GPT_4_32k,
ModelType.GPT_4_TURBO,
ModelType.GPT_4_TURBO_V,
ModelType.GPT_4o,
None
}:
model_class = OpenAIModel
elif model_type == ModelType.STUB:
model_class = StubModel
else:
raise ValueError("Unknown model")

if model_type is None:
model_type = default_model_type

# log_visualize("Model Type: {}".format(model_type))
inst = model_class(model_type, model_config_dict)
return inst
Loading