Skip to content

Enhanced Gateway Connection Manager#83

Draft
JeremyWurbs wants to merge 6 commits intoservices/feature/gatewayfrom
services/feature/gateway-enhanced-cm
Draft

Enhanced Gateway Connection Manager#83
JeremyWurbs wants to merge 6 commits intoservices/feature/gatewayfrom
services/feature/gateway-enhanced-cm

Conversation

@JeremyWurbs
Copy link
Contributor

@JeremyWurbs JeremyWurbs commented Jul 2, 2025

This PR implements and closes #82.

It enables all connection managers generated from connecting to a Gateway-derived class the ability to use registered app endpoints as attributes on the returned connection manager.

Namely,

  1. All connection managers generated by a Gateway-derived Service (i.e. including children classes from Gateway) will recognize all registered apps that supplied their own connection manager.
from mindtrace.services import Gateway
from mindtrace.services.sample.echo_service import EchoService

# Launch a custom Service alongside a Gateway, and register the Service to it
echo_cm = EchoService.launch(url=echo_url)
gateway_cm1 = Gateway.launch(url=gateway_url)
gateway_cm1.register_app("echoer", echo_url, echo_cm)

# The Gateway connection manager may now use the EchoService custom endpoints
gateway_cm1.echoer.echo(message="Hello, world!")

# NEW: Newly created Gateway connection managers may now also use the same endpoints
gateway_cm2 = Gateway.connect(url=gateway_url)  # Connecting to a pre-existing Gateway will transfer registered app endpoints
gateway_cm2.echoer.echo(message="Hello world, too!")
  1. The same works (with a little extra magic) if both connection managers were made before the custom service was registered:
from mindtrace.services import Gateway
from mindtrace.services.sample.echo_service import EchoService

# Connect the second gateway connection manager first
echo_cm = EchoService.launch(url=echo_url)
gateway_cm1 = Gateway.launch(url=gateway_url)
gateway_cm2 = Gateway.connect(url=gateway_url)

# Then register the custom service
gateway_cm1.register_app("echoer", echo_url, echo_cm)

# And the second connection manager will still work
gateway_cm2.echoer.echo(message="Yet another good day, world!")  # Still works

This PR is being raise as a draft PR for now, for a few reasons, which I will be unlikely to have time to fix before next week, but would like to make this code available before then:

  1. There are a number of dead loose ends which were generated while producing the code which need to be cleaned up;
  2. The implemented logic is likely far too complicated for what is needed; equivalently there are logical errors & bugs are still likely;
  3. There are a number of additional design decisions, which should likely be made with MCP-compliance in mind, with discussions happening next week;
  4. The test coverage needs to be greatly improved;

Implementation Details

New Service.detailed_endpoints() method

A new detailed_endpoints endpoint/method has been added to the Service base class, which will return all of the endpoints with their schema information.

import json
from mindtrace.services import Service

cm = Service.launch()
print(json.dumps(cm.detailed_endpoints().endpoints, indent=4))
"""
{
    "endpoints": {
        "name": "endpoints",
        "input_schema": null,
        "output_schema": {
            "properties": {
                "endpoints": {
                    "items": {
                        "type": "string"
                    },
                    "title": "Endpoints",
                    "type": "array"
                }
            },
            "required": [
                "endpoints"
            ],
            "title": "EndpointsOutput",
            "type": "object"
        }
    },
...
"""

New Gateway._create_synthetic_connection_manager

The Gateway class has a new method, similar to generate_connection_manager, but uses the provided endpoint information from the above to create a ConnectionManager. The Gateway only collects and tracks this information for apps that provide it (in self.registered_app_info).

TODO: Note that in the current implementation, it is the act of providing a connection manager that triggers the tracking of the service's schemas, but actually non-Service-derived classes may also already have said schemas (especially if we are considering it to tie in with MCP-compliance). In the future it should be possible to enable the described behavior for any FastAPI / MCP service that provides associated schemas.

class Gateway(Service):
    def __init__(self)
        self.registered_routers = {}
        self.registered_app_info = {}

   @classmethod
    def _create_synthetic_connection_manager(cls, service_url: str, endpoints: Dict[str, Dict[str, Any]]):
        """Create a synthetic connection manager based on endpoint schema information."""
        from mindtrace.services.core.connection_manager import ConnectionManager
        
        # Create a basic connection manager instance
        synthetic_cm = ConnectionManager(url=service_url)
        
        # Add methods based on the endpoint information
        for endpoint_name, endpoint_info in endpoints.items():
            # Create a method that matches the endpoint signature
            def make_endpoint_method(name):
                def endpoint_method(*args, **kwargs):
                    # This method won't actually be called since the proxy will intercept
                    # But it needs to exist for the proxy to detect it
                    return {"endpoint": name, "args": args, "kwargs": kwargs}
                endpoint_method.__name__ = name
                return endpoint_method
            
            # Add the method to the synthetic connection manager
            setattr(synthetic_cm, endpoint_name, make_endpoint_method(endpoint_name))
        
        return synthetic_cm

As this behavior is required by MCP, we may decide to simply make this endpoint match the MCP version. Refer here for a working example.

Update Gateway.connect

The above is used in the updated Gateway.connect method. The current logic is quite long, and ties in to the even-longer ProxyConnectionManager class, which still exists.

New Gateway.reconnect

There is a new Gateway.reconnect method. It might make sense to do a simpler "reconnect" call, where the original connect kwargs are stored and used again, but instead in the current implementation reconnect doesn't attempt to do that, but rather just refreshes the endpoint information.

In theory it sounded like the simpler implementation, in practice it is just as complicated as the original connect.

Catching AttributeError with a reconnect + retry

Consider the following:

from mindtrace.services import Gateway
from mindtrace.services.sample.echo_service import EchoService

echo_url = "http://localhost:8080/"
gateway_url = "http://localhost:8081/"

echo_cm = EchoService.launch(url=echo_url)
gateway_cm = Gateway.launch(url=gateway_url)
gateway_cm2 = Gateway.connect(url=gateway_url)

gateway_cm.register_app("echoer", echo_url, echo_cm) 

At this point, if you check if echoer is an attribute on each connection manager:

'echoer' in gateway_cm.__dict__  # True
'echoer' in gateway_cm2.__dict__  # False

It would appear that the second connection manager doesn't know about echoer, but if you reach for it, you find it's there:

type(gateway_cm2.echoer)  # <class 'mindtrace.services.gateway.proxy_connection_manager.ProxyConnectionManager'>

The main logic that makes the above possible is simply:

  1. Catch any AttributeError thrown by __getattribute__, which occurs when "echoer" is not found as an attribute in gateway_cm2;
  2. Automatically try to reconnect once, refreshing the connection manager attributes, ideally to now contain the updated property;
  3. Try calling the attribute one more time, returning any errors this time as usual.

Buried in the updated connect method:

class Gateway:
    def connect():
        ...
        # Store original __getattribute__ method
        original_getattribute = base_cm.__class__.__getattribute__
        
        def enhanced_getattribute(self, name):
            """Enhanced __getattribute__ that reconnects and retries on AttributeError for connection manager attributes."""
            try:
                return original_getattribute(self, name)
            except AttributeError as e:
                if (not name.startswith('_') and 
                    not hasattr(type(self), name) and 
                    not hasattr(self.__class__, name)):
                    
                    # Check if this app was registered by this connection manager without a proxy
                    # If so, don't create a synthetic proxy (respect the original registration choice)
                    if name in self._registered_apps and self._registered_apps[name] is None:
                        # This app was registered by this CM without a connection manager, don't create proxy
                        raise e
                    
                    # Try reconnecting to refresh the app list
                    try:
                        self.reconnect()
                        # Try accessing the attribute again after reconnect
                        return original_getattribute(self, name)
                    except Exception:
                        # If reconnect fails or attribute still doesn't exist, raise original error
                        pass
                
                # Re-raise the original AttributeError
                raise e
        
        # Overwrite __getattribute__ with the enhanced version
        base_cm.__class__.__getattribute__ = enhanced_getattribute

Documentation & Testing

Unit and integration tests have been added covering the new use case, but many more tests, including the myriad of corner cases, should be added before considered complete. Tests should still all pass with ~70% coverage on the Gateway and ProxyConnectionManager files.

git clone git@github.com:mindtrace/mindtrace enhanced_cm && cd enhanced_cm
git checkout services/feature/gateway-enhanced-cm
uv sync
uv tool install ds-run
ds test

@JeremyWurbs JeremyWurbs added this to the 0.2.0 (July 2025 Release) milestone Jul 2, 2025
@JeremyWurbs JeremyWurbs self-assigned this Jul 2, 2025
@JeremyWurbs JeremyWurbs added the mindtrace-services Issues raised from services module in mindtrace package label Jul 2, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR extends the Gateway to propagate registered app endpoints (with schema-based proxies) across all connection manager instances—both existing and newly created—by introducing a detailed schema endpoint, synthetic CMs, and a reconnect mechanism.

  • Adds a /detailed_endpoints route and get_detailed_endpoints in Service for full schema introspection
  • Refactors ProxyConnectionManager to generate schema-aware sync/async proxy methods and property routing
  • Enhances Gateway.connect/reconnect to auto‐sync existing registrations via synthetic CMs and track manual registrations

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
mindtrace/services/mindtrace/services/core/service.py Adds detailed_endpoints route and schema for full endpoint schemas
mindtrace/services/mindtrace/services/core/types.py Introduces DetailedEndpointsSchema for the new route
mindtrace/services/mindtrace/services/core/utils.py Ensures service endpoints are stored on generated CM classes
mindtrace/services/mindtrace/services/gateway/gateway.py Implements list_apps[_with_schemas], enhanced register_app, connect, and reconnect
mindtrace/services/mindtrace/services/gateway/proxy_connection_manager.py Refactors proxy logic to use TaskSchema, sync/async methods, and property forwarding
tests/unit/mindtrace/services/gateway/test_proxy_connection_manager.py Updates and expands tests for proxy initialization, property/method routing
tests/unit/mindtrace/services/gateway/test_gateway.py Adds tests for enhanced registered_apps behavior
tests/integration/mindtrace/services/test_gateway_integration.py Adds end‐to‐end scenarios for multi‐CM sync, URL object support, and discovery
Comments suppressed due to low confidence (4)

mindtrace/services/mindtrace/services/gateway/gateway.py:276

  • [nitpick] Using raw app_name (which may contain hyphens or other invalid identifier characters) as an attribute risks invalid attribute access. Consider sanitizing or normalizing names (e.g., replace '-' with '_').
                        setattr(base_cm, app_name, proxy_cm)

mindtrace/services/mindtrace/services/gateway/gateway.py:366

  • The new reconnect method is critical for syncing late registrations but lacks dedicated unit tests. Please add tests that verify attribute refresh and synthetic proxy creation on attribute access failure.
            def reconnect():

mindtrace/services/mindtrace/services/gateway/gateway.py:69

  • [nitpick] The list_apps endpoint is using POST; a read-only resource may be better served by GET for consistency with RESTful conventions.
        self.add_endpoint("/list_apps", func=self.list_apps, schema=ListAppsTaskSchema(), methods=["POST"])

mindtrace/services/mindtrace/services/core/utils.py:129

  • This line reassigns _service_endpoints twice in a row. You can remove the redundant assignment to keep the code concise.
    ServiceConnectionManager._service_endpoints = temp_service._endpoints

Comment on lines 9 to 10
from fastapi import HTTPException

Copy link

Copilot AI Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import of HTTPException. Please remove one of the two identical imports to reduce redundancy.

Suggested change
from fastapi import HTTPException

Copilot uses AI. Check for mistakes.
return service_class._endpoints

# Last resort: infer from methods (less reliable)
return self._infer_endpoints_from_methods(original_cm)
Copy link

Copilot AI Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance‐level _service_endpoints on original_cm is never checked. If endpoints are stored on the instance rather than the class, they will be lost—consider adding elif hasattr(original_cm, '_service_endpoints') before falling back.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

mindtrace-services Issues raised from services module in mindtrace package

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants