Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,7 @@ Pipelines are processing functions that extend Open WebUI with **custom AI model

🔗 [Azure AI Pipeline in Open WebUI](https://openwebui.com/f/owndev/azure_ai/)

### **2. [Azure AI Foundry Pipeline for DeepSeek-R1](./pipelines/azure/azure_ai_foundry_deepseek.py)**

- A specialized version of the **Azure AI Foundry Pipeline** for **DeepSeek-R1**.
- Uses Azure’s **DeepSeek-R1** AI model for advanced text processing.
- Includes the same error handling, parameter filtering, and request management as the standard Azure AI Foundry Pipeline.

🔗 [Azure AI Pipeline for DeepSeek-R1 in Open WebUI](https://openwebui.com/f/owndev/azure_ai_deepseek_r1)

### **3. [N8N Pipeline](./pipelines/n8n/n8n.py)**
### **2. [N8N Pipeline](./pipelines/n8n/n8n.py)**

- Integrates **Open WebUI** with **N8N**, an automation and workflow platform.
- Sends messages from Open WebUI to an **N8N webhook**.
Expand All @@ -85,6 +77,18 @@ Pipelines are processing functions that extend Open WebUI with **custom AI model

🔗 [Learn More About N8N](https://n8n.io/)

### **3. [Infomaniak](./pipelines/infomaniak/infomaniak.py)**

- Integrates **Open WebUI** with **Infomaniak**, a Swiss web hosting and cloud services provider.
- Sends messages from Open WebUI to an **Infomaniak AI Tool**.

> **Important**: The function ID in Open WebUI must not contain the name `infomaniak`. Because of a [bug](https://github.com/open-webui/open-webui/discussions/10914) in Open WebUI, the function will not work if the id contains `infomaniak`.


🔗 [Infomaniak Pipeline in Open WebUI](https://openwebui.com/f/owndev/im_ai_tools/)

🔗 [Learn More About Infomaniak](https://www.infomaniak.com/en/hosting/ai-tools)

---

## Filters
Expand Down
286 changes: 286 additions & 0 deletions pipelines/infomaniak/infomaniak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
"""
title: Infomaniak AI Tools Pipeline
author: owndev
author_url: https://github.com/owndev
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/owndev/Open-WebUI-Functions
infomaniak_url: https://www.infomaniak.com/en/hosting/ai-tools
version: 1.0.0
license: MIT
description: A manifold pipeline for interacting with Infomaniak AI Tools.
features:
- Manifold pipeline for Infomaniak AI Tools
- Lists available models for easy access
- Robust error handling and logging
- Handles streaming and non-streaming responses
"""

from typing import List, Union, Generator, Iterator, Optional, Dict, Any
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from starlette.background import BackgroundTask
from open_webui.env import AIOHTTP_CLIENT_TIMEOUT, SRC_LOG_LEVELS
import aiohttp
import json
import os
import logging

# Helper functions
async def cleanup_response(
response: Optional[aiohttp.ClientResponse],
session: Optional[aiohttp.ClientSession],
) -> None:
"""
Clean up the response and session objects.

Args:
response: The ClientResponse object to close
session: The ClientSession object to close
"""
if response:
response.close()
if session:
await session.close()

class Pipe:
# Environment variables for API key, endpoint, and optional model
class Valves(BaseModel):
# API key for Infomaniak
INFOMANIAK_API_KEY: str = Field(
default=os.getenv("INFOMANIAK_API_KEY", "API_KEY"),
description="API key for Infomaniak AI TOOLS API"
)
# Product ID for Infomaniak
INFOMANIAK_PRODUCT_ID: int = Field(
default=os.getenv("INFOMANIAK_PRODUCT_ID", 50070),
description="Product ID for Infomaniak AI TOOLS API"
)
# Base URL for Infomaniak API
INFOMANIAK_BASE_URL: str = Field(
default=os.getenv("INFOMANIAK_BASE_URL", "https://api.infomaniak.com"),
description="Base URL for Infomaniak API"
)
# Prefix for model names
NAME_PREFIX: str = Field(
default="Infomaniak: ",
description="Prefix to be added before model names"
)

def __init__(self):
self.type = "manifold"
self.valves = self.Valves()
self.name: str = self.valves.NAME_PREFIX

def validate_environment(self) -> None:
"""
Validates that required environment variables are set.

Raises:
ValueError: If required environment variables are not set.
"""
if not self.valves.INFOMANIAK_API_KEY:
raise ValueError("INFOMANIAK_API_KEY is not set!")
if not self.valves.INFOMANIAK_PRODUCT_ID:
raise ValueError("INFOMANIAK_PRODUCT_ID is not set!")
if not self.valves.INFOMANIAK_BASE_URL:
raise ValueError("INFOMANIAK_BASE_URL is not set!")

def get_headers(self) -> Dict[str, str]:
"""
Constructs the headers for the API request.

Returns:
Dictionary containing the required headers for the API request.
"""
headers = {
"Authorization": f"Bearer {self.valves.INFOMANIAK_API_KEY}",
"Content-Type": "application/json"
}
return headers

def get_api_url(self, endpoint: str = "chat/completions") -> str:
"""
Constructs the API URL for Infomaniak requests.

Args:
endpoint: The API endpoint to use

Returns:
Full API URL
"""
return f"{self.valves.INFOMANIAK_BASE_URL}/1/ai/{self.valves.INFOMANIAK_PRODUCT_ID}/openai/{endpoint}"

def validate_body(self, body: Dict[str, Any]) -> None:
"""
Validates the request body to ensure required fields are present.

Args:
body: The request body to validate

Raises:
ValueError: If required fields are missing or invalid.
"""
if "messages" not in body or not isinstance(body["messages"], list):
raise ValueError("The 'messages' field is required and must be a list.")

async def get_infomaniak_models(self) -> List[Dict[str, str]]:
"""
Returns a list of Infomaniak AI LLM models.

Returns:
List of dictionaries containing model id and name.
"""
log = logging.getLogger("infomaniak_ai_tools.get_models")
log.setLevel(SRC_LOG_LEVELS["OPENAI"])

headers = self.get_headers()
models = []

try:
async with aiohttp.ClientSession() as session:
async with session.get(
url=f"{self.valves.INFOMANIAK_BASE_URL}/1/ai/models",
headers=headers
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("result") == "success" and "data" in data:
models_data = data["data"]
if isinstance(models_data, list):
for item in models_data:
if not isinstance(item, dict):
log.error(f"Expected item to be dict but got: {type(item).__name__}")
continue
if item.get("type") == "llm": # only include llm models
models.append({
"id": item.get("name", ""),
"name": item.get("description", item.get("name", "")),
# Profile image and description are currently not working in Open WebUI
"meta": {
"profile_image_url": item.get("logo_url", ""),
"description": item.get("documentation_link", "")
}
})
return models
else:
log.error("Expected 'data' to be a list but received a non-list value.")
log.error(f"Failed to get Infomaniak models: {await resp.text()}")
except Exception as e:
log.exception(f"Error getting Infomaniak models: {str(e)}")

# Default model if API call fails
return [{"id": f"{self.valves.INFOMANIAK_PRODUCT_ID}", "name": "Infomaniak: LLM API"}]

async def pipes(self) -> List[Dict[str, str]]:
"""
Returns a list of available pipes based on configuration.

Returns:
List of dictionaries containing pipe id and name.
"""
self.validate_environment()
return await self.get_infomaniak_models()

async def pipe(self, body: Dict[str, Any]) -> Union[str, Generator, Iterator, Dict[str, Any], StreamingResponse]:
"""
Main method for sending requests to the Infomaniak AI endpoint.

Args:
body: The request body containing messages and other parameters

Returns:
Response from Infomaniak AI API, which could be a string, dictionary or streaming response
"""
log = logging.getLogger("infomaniak_ai_tools.pipe")
log.setLevel(SRC_LOG_LEVELS["OPENAI"])

# Validate the request body
self.validate_body(body)

# Construct headers
headers = self.get_headers()

# Filter allowed parameters (https://developer.infomaniak.com/docs/api/post/1/ai/%7Bproduct_id%7D/openai/chat/completions)
allowed_params = {
"frequency_penalty",
"logit_bias",
"logprobs",
"max_tokens",
"messages",
"model",
"n",
"presence_penalty",
"profile_type",
"seed",
"stop",
"stream",
"temperature",
"top_logprobs",
"top_p"
}
filtered_body = {k: v for k, v in body.items() if k in allowed_params}

# Handle model extraction for Infomaniak
if "model" in filtered_body and filtered_body["model"]:
# Extract model ID
filtered_body["model"] = filtered_body["model"].split(".", 1)[1] if "." in filtered_body["model"] else filtered_body["model"]

# Convert the modified body back to JSON
payload = json.dumps(filtered_body)

request = None
session = None
streaming = False
response = None

try:
session = aiohttp.ClientSession(
trust_env=True,
timeout=aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT),
)

api_url = self.get_api_url()
request = await session.request(
method="POST",
url=api_url,
data=payload,
headers=headers,
)

# Check if response is SSE
if "text/event-stream" in request.headers.get("Content-Type", ""):
streaming = True
return StreamingResponse(
request.content,
status_code=request.status,
headers=dict(request.headers),
background=BackgroundTask(
cleanup_response, response=request, session=session
),
)
else:
try:
response = await request.json()
except Exception as e:
log.error(f"Error parsing JSON response: {e}")
response = await request.text()

request.raise_for_status()
return response

except Exception as e:
log.exception(f"Error in Infomaniak AI request: {e}")

detail = f"Exception: {str(e)}"
if isinstance(response, dict):
if "error" in response:
detail = f"{response['error']['message'] if 'message' in response['error'] else response['error']}"
elif isinstance(response, str):
detail = response

return f"Error: {detail}"
finally:
if not streaming and session:
if request:
request.close()
await session.close()