Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions cmd/thv-operator/pkg/vmcpconfig/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -132,6 +133,24 @@ func (c *Converter) Convert(
config.Audit.Component = vmcp.Name
}

// Convert optimizer config - resolve embeddingService to embeddingURL if needed
if vmcp.Spec.Config.Optimizer != nil {
optimizerConfig := vmcp.Spec.Config.Optimizer.DeepCopy()

// If embeddingService is set, resolve it to embeddingURL
if optimizerConfig.EmbeddingService != "" && optimizerConfig.EmbeddingURL == "" {
embeddingURL, err := c.resolveEmbeddingService(ctx, vmcp.Namespace, optimizerConfig.EmbeddingService)
if err != nil {
return nil, fmt.Errorf("failed to resolve embedding service %s: %w", optimizerConfig.EmbeddingService, err)
}
optimizerConfig.EmbeddingURL = embeddingURL
// Clear embeddingService since we've resolved it to URL
optimizerConfig.EmbeddingService = ""
}

config.Optimizer = optimizerConfig
}

// Apply operational defaults (fills missing values)
config.EnsureOperationalDefaults()

Expand Down Expand Up @@ -597,3 +616,31 @@ func validateCompositeToolNames(tools []vmcpconfig.CompositeToolConfig) error {
}
return nil
}

// resolveEmbeddingService resolves a Kubernetes service name to its URL by querying the service.
// Returns the service URL in format: http://<service-name>.<namespace>.svc.cluster.local:<port>
func (c *Converter) resolveEmbeddingService(ctx context.Context, namespace, serviceName string) (string, error) {
// Get the service
svc := &corev1.Service{}
key := types.NamespacedName{
Name: serviceName,
Namespace: namespace,
}
if err := c.k8sClient.Get(ctx, key, svc); err != nil {
return "", fmt.Errorf("failed to get service %s/%s: %w", namespace, serviceName, err)
}

// Find the first port (typically there's only one for embedding services)
if len(svc.Spec.Ports) == 0 {
return "", fmt.Errorf("service %s/%s has no ports", namespace, serviceName)
}

port := svc.Spec.Ports[0].Port
if port == 0 {
return "", fmt.Errorf("service %s/%s has invalid port", namespace, serviceName)
}

// Construct URL using full DNS name
url := fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", serviceName, namespace, port)
return url, nil
}
15 changes: 15 additions & 0 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,20 @@ func runServe(cmd *cobra.Command, _ []string) error {
if cfg.Optimizer.HybridSearchRatio != nil {
hybridRatio = *cfg.Optimizer.HybridSearchRatio
}

// embeddingURL should already be resolved from embeddingService by the operator
// If embeddingService is still set (CLI mode), log a warning
if cfg.Optimizer.EmbeddingService != "" {
logger.Warnf("embeddingService is set but not resolved to embeddingURL. This should be handled by the operator. Falling back to default port 11434")
// Simple fallback for CLI/testing scenarios
namespace := os.Getenv("POD_NAMESPACE")
if namespace != "" {
cfg.Optimizer.EmbeddingURL = fmt.Sprintf("http://%s.%s.svc.cluster.local:11434", cfg.Optimizer.EmbeddingService, namespace)
} else {
cfg.Optimizer.EmbeddingURL = fmt.Sprintf("http://%s:11434", cfg.Optimizer.EmbeddingService)
}
}

serverCfg.OptimizerConfig = &vmcpserver.OptimizerConfig{
Enabled: cfg.Optimizer.Enabled,
PersistPath: cfg.Optimizer.PersistPath,
Expand Down Expand Up @@ -522,3 +536,4 @@ func aggregateCapabilities(
},
}, nil
}

8 changes: 6 additions & 2 deletions pkg/optimizer/embeddings/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package embeddings

import (
"fmt"
"strings"
"sync"

"github.com/stacklok/toolhive/pkg/logger"
Expand All @@ -24,7 +25,7 @@ type Config struct {
BackendType string

// BaseURL is the base URL for the embedding service
// - Ollama: http://localhost:11434
// - Ollama: http://127.0.0.1:11434 (or http://localhost:11434, will be normalized to 127.0.0.1)
// - vLLM: http://localhost:8000
BaseURL string

Expand Down Expand Up @@ -84,7 +85,10 @@ func NewManager(config *Config) (*Manager, error) {
// Use Ollama native API (requires ollama serve)
baseURL := config.BaseURL
if baseURL == "" {
baseURL = "http://localhost:11434"
baseURL = "http://127.0.0.1:11434"
} else {
// Normalize localhost to 127.0.0.1 to avoid IPv6 resolution issues
baseURL = strings.ReplaceAll(baseURL, "localhost", "127.0.0.1")
}
model := config.Model
if model == "" {
Expand Down
13 changes: 12 additions & 1 deletion pkg/optimizer/embeddings/ollama.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"strings"

"github.com/stacklok/toolhive/pkg/logger"
)
Expand All @@ -29,12 +30,22 @@ type ollamaEmbedResponse struct {
Embedding []float64 `json:"embedding"`
}

// normalizeLocalhostURL converts localhost to 127.0.0.1 to avoid IPv6 resolution issues
func normalizeLocalhostURL(url string) string {
// Replace localhost with 127.0.0.1 to ensure IPv4 connection
// This prevents connection refused errors when Ollama only listens on IPv4
return strings.ReplaceAll(url, "localhost", "127.0.0.1")
}

// NewOllamaBackend creates a new Ollama backend
// Requires Ollama to be running locally: ollama serve
// Default model: all-minilm (all-MiniLM-L6-v2, 384 dimensions)
func NewOllamaBackend(baseURL, model string) (*OllamaBackend, error) {
if baseURL == "" {
baseURL = "http://localhost:11434"
baseURL = "http://127.0.0.1:11434"
} else {
// Normalize localhost to 127.0.0.1 to avoid IPv6 resolution issues
baseURL = normalizeLocalhostURL(baseURL)
}
if model == "" {
model = "all-minilm" // Default embedding model (all-MiniLM-L6-v2)
Expand Down
6 changes: 4 additions & 2 deletions pkg/vmcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"net"
"net/http"
"time"

"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/client/transport"
Expand Down Expand Up @@ -198,8 +199,10 @@ func (h *httpBackendClient) defaultClientFactory(ctx context.Context, target *vm
})

// Create HTTP client with configured transport chain
// Set timeouts to prevent long-lived connections that require continuous listening
httpClient := &http.Client{
Transport: sizeLimitedTransport,
Timeout: 30 * time.Second, // Prevent hanging on connections
}

var c *client.Client
Expand All @@ -208,8 +211,7 @@ func (h *httpBackendClient) defaultClientFactory(ctx context.Context, target *vm
case "streamable-http", "streamable":
c, err = client.NewStreamableHttpClient(
target.BaseURL,
transport.WithHTTPTimeout(0),
transport.WithContinuousListening(),
transport.WithHTTPTimeout(30*time.Second), // Set timeout instead of 0
transport.WithHTTPBasicClient(httpClient),
)
if err != nil {
Expand Down