Adding support for Claude Models via Azure AI Foundry #109
Replies: 2 comments
-
|
If anyone encounters this in future or dev want to take some insights, the following code is working perfectly for this : Azure Anthropic PipelineA pipeline for interacting with Anthropic Claude models on Azure AI Foundry (endpoint """
title: Azure Anthropic Pipeline
author: custom
version: 1.0.1
license: Apache License 2.0
description: >
A pipeline for interacting with Anthropic Claude models hosted on Azure AI
(Anthropic /v1/messages endpoint). Converts Anthropic-style responses and SSE
into OpenAI-like structures for compatibility with OpenWebUI.
features:
- Supports Azure Anthropic /v1/messages endpoint
- Uses x-api-key + anthropic-version header
- Supports streaming and non-streaming responses
- Converts Anthropic SSE to OpenAI-style SSE (choices[].delta.content)
- Returns OpenAI-like response objects for better UI compatibility
- Ensures deployment/model name is used exactly (no prefix)
"""
from typing import (
List,
Union,
Generator,
Iterator,
Optional,
Dict,
Any,
AsyncIterator,
)
import aiohttp
import json
import os
import logging
import base64
import hashlib
from datetime import datetime
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from pydantic_core import core_schema
from open_webui.env import AIOHTTP_CLIENT_TIMEOUT, SRC_LOG_LEVELS
class EncryptedStr(str):
"""A string type that automatically handles encryption/decryption"""
@classmethod
def _get_encryption_key(cls) -> Optional[bytes]:
secret = os.getenv("WEBUI_SECRET_KEY")
if not secret:
return None
hashed_key = hashlib.sha256(secret.encode()).digest()
return base64.urlsafe_b64encode(hashed_key)
@classmethod
def encrypt(cls, value: str) -> str:
if not value or value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key:
return value
from cryptography.fernet import Fernet
f = Fernet(key)
encrypted = f.encrypt(value.encode())
return f"encrypted:{encrypted.decode()}"
@classmethod
def decrypt(cls, value: str) -> str:
if not value or not value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key:
return value[len("encrypted:"):]
from cryptography.fernet import Fernet, InvalidToken
try:
encrypted_part = value[len("encrypted:"):]
f = Fernet(key)
decrypted = f.decrypt(encrypted_part.encode())
return decrypted.decode()
except (InvalidToken, Exception):
return value
@classmethod
def __get_pydantic_core_schema__(cls, _source_type: Any, _handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
return core_schema.union_schema(
[
core_schema.is_instance_schema(cls),
core_schema.chain_schema(
[
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(lambda value: cls(cls.encrypt(value) if value else value))
]
),
],
serialization=core_schema.plain_serializer_function_ser_schema(lambda instance: str(instance)),
)
class Pipe:
class Valves(BaseModel):
AZURE_ANTHROPIC_PIPELINE_PREFIX: str = Field(
default=os.getenv("AZURE_ANTHROPIC_PIPELINE_PREFIX", "Azure Anthropic"),
description="Custom prefix for the pipeline display name.",
)
AZURE_ANTHROPIC_API_KEY: EncryptedStr = Field(
default=os.getenv("AZURE_ANTHROPIC_API_KEY", ""),
description="API key for Azure Anthropic (Azure AI Foundry).",
)
AZURE_ANTHROPIC_ENDPOINT: str = Field(
default=os.getenv(
"AZURE_ANTHROPIC_ENDPOINT",
"https://<resource>.services.ai.azure.com/anthropic/v1/messages",
),
description="Azure Anthropic /v1/messages endpoint.",
)
AZURE_ANTHROPIC_MODEL: str = Field(
default=os.getenv("AZURE_ANTHROPIC_MODEL", ""),
description="Default Anthropic model/deployment name (e.g. claude4).",
)
ANTHROPIC_VERSION: str = Field(
default=os.getenv("ANTHROPIC_VERSION", "2023-06-01"),
description="Anthropic API version header value.",
)
def __init__(self):
self.valves = self.Valves()
self.name: str = f"{self.valves.AZURE_ANTHROPIC_PIPELINE_PREFIX}:"
def validate_environment(self) -> None:
api_key = EncryptedStr.decrypt(self.valves.AZURE_ANTHROPIC_API_KEY)
if not api_key:
raise ValueError("AZURE_ANTHROPIC_API_KEY is not set!")
if not self.valves.AZURE_ANTHROPIC_ENDPOINT:
raise ValueError("AZURE_ANTHROPIC_ENDPOINT is not set!")
def get_headers(self) -> Dict[str, str]:
api_key = EncryptedStr.decrypt(self.valves.AZURE_ANTHROPIC_API_KEY)
return {
"x-api-key": api_key,
"Content-Type": "application/json",
"anthropic-version": self.valves.ANTHROPIC_VERSION,
}
def parse_models(self, models_str: str) -> List[str]:
if not models_str:
return []
return [m.strip() for m in models_str.replace(";", " ").replace(",", " ").split() if m.strip()]
def pipes(self) -> List[Dict[str, str]]:
self.validate_environment()
self.name = f"{self.valves.AZURE_ANTHROPIC_PIPELINE_PREFIX}: "
if self.valves.AZURE_ANTHROPIC_MODEL:
models = self.parse_models(self.valves.AZURE_ANTHROPIC_MODEL)
if models:
return [{"id": m, "name": m} for m in models]
else:
return [{"id": self.valves.AZURE_ANTHROPIC_MODEL, "name": self.valves.AZURE_ANTHROPIC_MODEL}]
return [{"id": "azure_anthropic", "name": self.name}]
async def anthropic_stream_to_openai_sse(
self,
content: aiohttp.StreamReader,
__event_emitter__=None,
response: Optional[aiohttp.ClientResponse] = None,
session: Optional[aiohttp.ClientSession] = None,
) -> AsyncIterator[bytes]:
log = logging.getLogger("azure_anthropic.stream")
buffer = ""
info_appended = False
try:
async for chunk in content:
chunk_str = chunk.decode("utf-8", errors="ignore")
buffer += chunk_str
while "\n\n" in buffer:
raw_event, buffer = buffer.split("\n\n", 1)
event_type = None
data_str = None
for line in raw_event.splitlines():
if line.startswith("event:"):
event_type = line[6:].strip()
elif line.startswith("data:"):
data_str = line[5:].strip()
if not data_str:
continue
if event_type == "content_block_delta":
try:
data = json.loads(data_str)
delta = data.get("delta", {})
text = delta.get("text", "")
if text:
oai_obj = {
"choices": [
{
"index": 0,
"delta": {"content": text},
}
]
}
sse = f"data: {json.dumps(oai_obj, ensure_ascii=False)}\n\n"
yield sse.encode("utf-8")
except Exception as e:
log.debug(f"Error parsing content_block_delta: {e}")
elif event_type == "message_stop":
if not info_appended:
info_text = (
"\n\n<details>\n"
"<summary>ℹ️ Provider Info</summary>\n\n"
"Response generated by Anthropic Claude on Azure AI.\n"
"</details>\n"
)
oai_obj = {
"choices": [
{
"index": 0,
"delta": {"content": info_text},
}
]
}
sse = f"data: {json.dumps(oai_obj, ensure_ascii=False)}\n\n"
yield sse.encode("utf-8")
info_appended = True
done_sse = "data: [DONE]\n\n"
yield done_sse.encode("utf-8")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Streaming completed", "done": True},
}
)
except Exception as e:
log.error(f"Error processing Anthropic stream: {e}")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Error: {str(e)}", "done": True},
}
)
finally:
try:
if response:
response.close()
except Exception:
pass
try:
if session:
await session.close()
except Exception:
pass
async def pipe(
self, body: Dict[str, Any], __event_emitter__=None
) -> Union[str, Generator, Iterator, Dict[str, Any], StreamingResponse]:
log = logging.getLogger("azure_anthropic.pipe")
log.setLevel(SRC_LOG_LEVELS["OPENAI"])
self.validate_environment()
selected_model = None
if "model" in body and body["model"]:
selected_model = body["model"]
elif self.valves.AZURE_ANTHROPIC_MODEL:
selected_model = self.valves.AZURE_ANTHROPIC_MODEL
filtered_body = {k: v for k, v in body.items() if k in {
"model", "messages", "max_tokens", "metadata", "stop_sequences", "temperature", "top_p", "stream", "system", "extra_headers", "extra_body", "timeout"
}}
if selected_model:
filtered_body["model"] = selected_model
if "max_tokens" not in filtered_body or not filtered_body["max_tokens"]:
filtered_body["max_tokens"] = 1024
wants_stream = bool(filtered_body.get("stream", False))
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Sending request to Azure Anthropic...", "done": False},
})
headers = self.get_headers()
session = None
request = None
response_obj: Any = None
try:
session = aiohttp.ClientSession(
trust_env=True,
timeout=aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT),
)
request = await session.request(
method="POST",
url=self.valves.AZURE_ANTHROPIC_ENDPOINT,
headers=headers,
json=filtered_body,
)
if request.status >= 400:
try:
err_ct = (request.headers.get("Content-Type") or "").lower()
if "json" in err_ct:
response_obj = await request.json()
else:
response_obj = await request.text()
except Exception:
response_obj = await request.text()
request.raise_for_status() # will raise for 4xx/5xx
if wants_stream:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Streaming response from Azure Anthropic...", "done": False},
})
sse_headers = {"Content-Type": "text/event-stream"}
return StreamingResponse(
self.anthropic_stream_to_openai_sse(
request.content,
__event_emitter__=__event_emitter__,
response=request,
session=session,
),
status_code=request.status,
headers=sse_headers,
)
content_type_header = (request.headers.get("Content-Type") or "").lower()
if "json" in content_type_header:
response_obj = await request.json()
else:
try:
response_text = await request.text()
response_obj = json.loads(response_text)
except Exception:
response_obj = response_text
request.raise_for_status()
if isinstance(response_obj, dict) and response_obj.get("type") == "message":
text_chunks = []
for c in response_obj.get("content", []):
if isinstance(c, dict) and c.get("type") == "text":
text_chunks.append(c.get("text", ""))
full_text = "".join(text_chunks)
oai_like = {
"id": response_obj.get("id"),
"object": "chat.completion",
"created": int(datetime.utcnow().timestamp()),
"model": response_obj.get("model", filtered_body.get("model")),
"choices": [
{
"index": 0,
"finish_reason": response_obj.get("stop_reason"),
"message": {
"role": response_obj.get("role", "assistant"),
"content": full_text
+ "\n\n<details>\n"
"<summary>ℹ️ Provider Info</summary>\n\n"
"Response generated by Anthropic Claude on Azure AI.\n"
"</details>\n",
},
}
],
"usage": response_obj.get("usage", {}),
}
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Request completed", "done": True},
})
return oai_like
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Request completed (raw response)", "done": True},
})
return response_obj
except Exception as e:
log.exception(f"Error in Azure Anthropic request: {e}")
detail = f"Exception: {str(e)}"
if isinstance(response_obj, dict) and "error" in response_obj:
err = response_obj["error"]
if isinstance(err, dict) and "message" in err:
detail = err["message"]
else:
detail = str(err)
elif isinstance(response_obj, str):
detail = response_obj
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Error: {detail}", "done": True},
})
return f"Error: {detail}"
finally:
if session and not filtered_body.get("stream", False):
try:
if request:
request.close()
except Exception:
pass
try:
await session.close()
except Exception:
pass |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for your example. It's definitely useful for anyone wanting to use Anthropic models. I won't be implementing it in my pipeline because I don't want to make adjustments for every model that has a different URL or format. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Question summary
Now Antropic models are available via azure too, but the function is unable to communicate with the model
Details
This is following the different URL Format of Chat Completions i.e. /v1/messages
or
What have you tried?
But when i am using this function it is giving error like this:
Error: Access denied due to invalid subscription key or wrong API endpoint. Make sure to provide a valid key for an active subscription and use a correct regional API endpoint for your resource.But when using Pure Python and Anthropic SDK, it is working just fine, CURL Command is giving the same issue.
can you please help in implemementing this for Anthropic SDK as well ?
Thank you
Beta Was this translation helpful? Give feedback.
All reactions