Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-introduce the outbound federation proxy #15913

Merged
merged 13 commits into from
Jul 18, 2023
Merged
Prev Previous commit
Next Next commit
Add authentication when proxying outbound federation traffic through …
…another worker
  • Loading branch information
MadLittleMods committed Jul 12, 2023
commit cb7c73e05108c84d5a90d92b137b01b345944e79
2 changes: 1 addition & 1 deletion synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
federation_agent=hs.get_federation_http_client().agent,
hs=hs,
)

if isinstance(listener_config, TCPListenerConfig):
Expand Down
20 changes: 19 additions & 1 deletion synapse/http/connectproxyclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import base64
import logging
from typing import Optional, Union
Expand Down Expand Up @@ -39,8 +40,14 @@ class ProxyConnectError(ConnectError):
pass


@attr.s(auto_attribs=True)
class ProxyCredentials:
@abc.abstractmethod
def as_proxy_authorization_value(self) -> bytes:
raise NotImplementedError()


@attr.s(auto_attribs=True)
class BasicProxyCredentials(ProxyCredentials):
username_password: bytes

def as_proxy_authorization_value(self) -> bytes:
Expand All @@ -55,6 +62,17 @@ def as_proxy_authorization_value(self) -> bytes:
return b"Basic " + base64.encodebytes(self.username_password)


@attr.s(auto_attribs=True)
class BearerProxyCredentials(ProxyCredentials):
access_token: bytes

def as_proxy_authorization_value(self) -> bytes:
"""
Return the value for a Proxy-Authorization header (i.e. 'Bearer xxx').
"""
return b"Bearer " + self.access_token


@implementer(IStreamClientEndpoint)
class HTTPConnectProxyEndpoint:
"""An Endpoint implementation which will send a CONNECT request to an http proxy
Expand Down
14 changes: 12 additions & 2 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
encode_query_args,
read_body_with_max_size,
)
from synapse.http.connectproxyclient import BearerProxyCredentials
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
Expand Down Expand Up @@ -407,14 +408,23 @@ def __init__(
hs.config.server.federation_ip_range_blocklist,
)
else:
proxy_authorization_secret = hs.config.worker.worker_replication_secret
assert (
proxy_authorization_secret is not None
), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
federation_proxy_credentials = BearerProxyCredentials(
proxy_authorization_secret.encode("ascii")
)

# We need to talk to federation via the proxy via one of the configured
# locations
federation_proxies = outbound_federation_restricted_to.locations
federation_proxy_locations = outbound_federation_restricted_to.locations
federation_agent = ProxyAgent(
self.reactor,
self.reactor,
tls_client_options_factory,
federation_proxies=federation_proxies,
federation_proxy_locations=federation_proxy_locations,
federation_proxy_credentials=federation_proxy_credentials,
)

# Use a BlocklistingAgentWrapper to prevent circumventing the IP
Expand Down
43 changes: 37 additions & 6 deletions synapse/http/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IResponse
from twisted.web.iweb import IResponse
from twisted.web.resource import IResource
from twisted.web.server import Site
from twisted.web.server import Request, Site

from synapse.api.errors import Codes
from synapse.http import QuieterFileBodyProducer
Expand All @@ -38,6 +38,7 @@

if TYPE_CHECKING:
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,16 +99,46 @@ class ProxyResource(_AsyncResource):

isLeaf = True

def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent):
def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
super().__init__(True)

self.reactor = reactor
self.agent = federation_agent
self.agent = hs.get_federation_http_client().agent

self._proxy_authorization_secret = None
if hs.config.worker.worker_replication_secret:
self._proxy_authorization_secret = (
hs.config.worker.worker_replication_secret
)

def _check_auth(self, request: Request) -> None:
# The `matrix-federation://` proxy functionality can only be used with auth.
# Protect from people forgetting to set a secret.
assert self._proxy_authorization_secret is not None

# Get the authorization header.
auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")

if not auth_headers:
raise RuntimeError("Missing Proxy-Authorization header.")
if len(auth_headers) > 1:
raise RuntimeError("Too many Proxy-Authorization headers.")
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
received_secret = parts[1].decode("ascii")
if self._proxy_authorization_secret == received_secret:
# Success!
return

raise RuntimeError("Invalid Proxy-Authorization header.")

async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
uri = urllib.parse.urlparse(request.uri)
assert uri.scheme == b"matrix-federation"

# Check the authorization headers before handling the request.
self._check_auth(request)

headers = Headers()
for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
header_value = request.getHeader(header_name)
Expand Down Expand Up @@ -235,11 +266,11 @@ def __init__(
self,
resource: IResource,
reactor: ISynapseReactor,
federation_agent: IAgent,
hs: "HomeServer",
):
super().__init__(resource, reactor=reactor)

self._proxy_resource = ProxyResource(reactor, federation_agent)
self._proxy_resource = ProxyResource(reactor, hs=hs)

def getResourceFor(self, request: "SynapseRequest") -> IResource:
uri = urllib.parse.urlparse(request.uri)
Expand Down
71 changes: 52 additions & 19 deletions synapse/http/proxyagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
InstanceUnixLocationConfig,
)
from synapse.http import redact_uri
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
from synapse.http.connectproxyclient import (
BasicProxyCredentials,
HTTPConnectProxyEndpoint,
ProxyCredentials,
)
from synapse.logging.context import run_in_background

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,10 +94,14 @@ class ProxyAgent(_AgentBase):
use_proxy: Whether proxy settings should be discovered and used
from conventional environment variables.

federation_proxies: An optional list of locations to proxy outbound federation
federation_proxy_locations: An optional list of locations to proxy outbound federation
traffic through (only requests that use the `matrix-federation://` scheme
will be proxied).

federation_proxy_credentials: Required if `federation_proxy_locations` is set. The
credentials to use when proxying outbound federation traffic through another
worker.

Raises:
ValueError if use_proxy is set and the environment variables
contain an invalid proxy specification.
Expand All @@ -109,7 +117,8 @@ def __init__(
bindAddress: Optional[bytes] = None,
pool: Optional[HTTPConnectionPool] = None,
use_proxy: bool = False,
federation_proxies: Collection[InstanceLocationConfig] = (),
federation_proxy_locations: Collection[InstanceLocationConfig] = (),
federation_proxy_credentials: Optional[ProxyCredentials] = None,
):
contextFactory = contextFactory or BrowserLikePolicyForHTTPS()

Expand Down Expand Up @@ -149,39 +158,45 @@ def __init__(
self._reactor = reactor

self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
if federation_proxies:
self._federation_proxy_credentials: Optional[ProxyCredentials] = None
if federation_proxy_locations:
assert (
federation_proxy_credentials is not None
), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"

endpoints: List[IStreamClientEndpoint] = []
for federation_proxy in federation_proxies:
for federation_proxy_location in federation_proxy_locations:
endpoint: IStreamClientEndpoint
if isinstance(federation_proxy, InstanceTcpLocationConfig):
if isinstance(federation_proxy_location, InstanceTcpLocationConfig):
endpoint = HostnameEndpoint(
self.proxy_reactor,
federation_proxy.host,
federation_proxy.port,
federation_proxy_location.host,
federation_proxy_location.port,
)
if federation_proxy.tls:
if federation_proxy_location.tls:
tls_connection_creator = (
self._policy_for_https.creatorForNetloc(
federation_proxy.host,
federation_proxy.port,
federation_proxy_location.host.encode("utf-8"),
federation_proxy_location.port,
)
)
endpoint = wrapClientTLS(tls_connection_creator, endpoint)

elif isinstance(federation_proxy, InstanceUnixLocationConfig):
elif isinstance(federation_proxy_location, InstanceUnixLocationConfig):
endpoint = UNIXClientEndpoint(
self.proxy_reactor, federation_proxy.path
self.proxy_reactor, federation_proxy_location.path
)

else:
# It is supremely unlikely we ever hit this
raise SchemeNotSupported(
f"Unknown type of Endpoint requested, check {federation_proxy}"
f"Unknown type of Endpoint requested, check {federation_proxy_location}"
)

endpoints.append(endpoint)

self._federation_proxy_endpoint = _ProxyEndpoints(endpoints)
self._federation_proxy_endpoint = _RandomSampleEndpoints(endpoints)
self._federation_proxy_credentials = federation_proxy_credentials

def request(
self,
Expand Down Expand Up @@ -274,6 +289,19 @@ def request(
parsed_uri.scheme == b"matrix-federation"
and self._federation_proxy_endpoint
):
assert (
self._federation_proxy_credentials is not None
), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"

# Set a Proxy-Authorization header
if headers is None:
headers = Headers()
# We always need authentication for the outbound federation proxy
headers.addRawHeader(
b"Proxy-Authorization",
self._federation_proxy_credentials.as_proxy_authorization_value(),
)

# Cache *all* connections under the same key, since we are only
# connecting to a single destination, the proxy:
endpoint = self._federation_proxy_endpoint
Expand Down Expand Up @@ -403,23 +431,28 @@ def parse_proxy(

credentials = None
if url.username and url.password:
credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))
credentials = BasicProxyCredentials(
b"".join([url.username, b":", url.password])
)

return url.scheme, url.hostname, url.port or default_port, credentials


@implementer(IStreamClientEndpoint)
class _ProxyEndpoints:
class _RandomSampleEndpoints:
"""An endpoint that randomly iterates through a given list of endpoints at
each connection attempt.
"""

def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None:
def __init__(
self,
endpoints: Sequence[IStreamClientEndpoint],
) -> None:
assert endpoints
self._endpoints = endpoints

def __repr__(self) -> str:
return f"<_ProxyEndpoints endpoints={self._endpoints}>"
return f"<_RandomSampleEndpoints endpoints={self._endpoints}>"

def connect(
self, protocol_factory: IProtocolFactory
Expand Down
7 changes: 4 additions & 3 deletions synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from twisted.internet.interfaces import IAddress
from twisted.python.failure import Failure
from twisted.web.http import HTTPChannel
from twisted.web.iweb import IAgent
from twisted.web.resource import IResource, Resource
from twisted.web.server import Request

Expand All @@ -42,6 +41,8 @@
if TYPE_CHECKING:
import opentracing

from synapse.server import HomeServer


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -627,7 +628,7 @@ def __init__(
server_version_string: str,
max_request_body_size: int,
reactor: ISynapseReactor,
federation_agent: IAgent,
hs: "HomeServer",
):
"""

Expand All @@ -645,7 +646,7 @@ def __init__(
super().__init__(
resource=resource,
reactor=reactor,
federation_agent=federation_agent,
hs=hs,
)

self.site_tag = site_tag
Expand Down
21 changes: 18 additions & 3 deletions tests/http/test_matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,12 @@ def default_config(self) -> Dict[str, Any]:
}
return conf

@override_config({"outbound_federation_restricted_to": ["federation_sender"]})
@override_config(
{
"outbound_federation_restricted_to": ["federation_sender"],
"worker_replication_secret": "secret",
}
)
def test_proxy_requests_through_federation_sender_worker(self) -> None:
"""
Test that all outbound federation requests go through the `federation_sender`
Expand Down Expand Up @@ -723,7 +728,12 @@ def test_proxy_requests_through_federation_sender_worker(self) -> None:
res = self.successResultOf(test_request_from_main_process_d)
self.assertEqual(res, {"foo": "bar"})

@override_config({"outbound_federation_restricted_to": ["federation_sender"]})
@override_config(
{
"outbound_federation_restricted_to": ["federation_sender"],
"worker_replication_secret": "secret",
}
)
def test_proxy_request_with_network_error_through_federation_sender_worker(
self,
) -> None:
Expand Down Expand Up @@ -773,7 +783,12 @@ def test_proxy_request_with_network_error_through_federation_sender_worker(
self.assertIsInstance(failure_res.value, RequestSendFailed)
self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException)

@override_config({"outbound_federation_restricted_to": ["federation_sender"]})
@override_config(
{
"outbound_federation_restricted_to": ["federation_sender"],
"worker_replication_secret": "secret",
}
)
def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None:
"""
Test to make sure hop-by-hop headers and addional headers defined in the
Expand Down
Loading