|
18 | 18 |
|
19 | 19 | from twisted.internet import defer |
20 | 20 | from twisted.internet.defer import Deferred |
21 | | -from twisted.internet.interfaces import IAddress, IConnector |
22 | | -from twisted.internet.protocol import ReconnectingClientFactory |
23 | | -from twisted.python.failure import Failure |
24 | 21 |
|
25 | 22 | from synapse.api.constants import EventTypes, Membership, ReceiptTypes |
26 | 23 | from synapse.federation import send_queue |
27 | 24 | from synapse.federation.sender import FederationSender |
28 | 25 | from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable |
29 | 26 | from synapse.metrics.background_process_metrics import run_as_background_process |
30 | | -from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol |
31 | 27 | from synapse.replication.tcp.streams import ( |
32 | 28 | AccountDataStream, |
33 | 29 | DeviceListsStream, |
|
53 | 49 | from synapse.util.metrics import Measure |
54 | 50 |
|
55 | 51 | if TYPE_CHECKING: |
56 | | - from synapse.replication.tcp.handler import ReplicationCommandHandler |
57 | 52 | from synapse.server import HomeServer |
58 | 53 |
|
59 | 54 | logger = logging.getLogger(__name__) |
|
62 | 57 | _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 5 |
63 | 58 |
|
64 | 59 |
|
65 | | -class DirectTcpReplicationClientFactory(ReconnectingClientFactory): |
66 | | - """Factory for building connections to the master. Will reconnect if the |
67 | | - connection is lost. |
68 | | -
|
69 | | - Accepts a handler that is passed to `ClientReplicationStreamProtocol`. |
70 | | - """ |
71 | | - |
72 | | - initialDelay = 0.1 |
73 | | - maxDelay = 1 # Try at least once every N seconds |
74 | | - |
75 | | - def __init__( |
76 | | - self, |
77 | | - hs: "HomeServer", |
78 | | - client_name: str, |
79 | | - command_handler: "ReplicationCommandHandler", |
80 | | - ): |
81 | | - self.client_name = client_name |
82 | | - self.command_handler = command_handler |
83 | | - self.server_name = hs.config.server.server_name |
84 | | - self.hs = hs |
85 | | - self._clock = hs.get_clock() # As self.clock is defined in super class |
86 | | - |
87 | | - hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying) |
88 | | - |
89 | | - def startedConnecting(self, connector: IConnector) -> None: |
90 | | - logger.info("Connecting to replication: %r", connector.getDestination()) |
91 | | - |
92 | | - def buildProtocol(self, addr: IAddress) -> ClientReplicationStreamProtocol: |
93 | | - logger.info("Connected to replication: %r", addr) |
94 | | - return ClientReplicationStreamProtocol( |
95 | | - self.hs, |
96 | | - self.client_name, |
97 | | - self.server_name, |
98 | | - self._clock, |
99 | | - self.command_handler, |
100 | | - ) |
101 | | - |
102 | | - def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None: |
103 | | - logger.error("Lost replication conn: %r", reason) |
104 | | - ReconnectingClientFactory.clientConnectionLost(self, connector, reason) |
105 | | - |
106 | | - def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None: |
107 | | - logger.error("Failed to connect to replication: %r", reason) |
108 | | - ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) |
109 | | - |
110 | | - |
111 | 60 | class ReplicationDataHandler: |
112 | 61 | """Handles incoming stream updates from replication. |
113 | 62 |
|
|
0 commit comments