Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Logcontexts for replication command handlers #3709

Merged
merged 2 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3709.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Logcontexts for replication command handlers
3 changes: 2 additions & 1 deletion synapse/app/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ def __init__(self, hs):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)

if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def __init__(self, hs):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(FederationSenderReplicationHandler, self).on_rdata(
yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ def __init__(self, hs):

self.pusher_pool = hs.get_pusherpool()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)

@defer.inlineCallbacks
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,9 @@ def __init__(self, hs):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)

def get_streams_to_replicate(self):
Expand Down
3 changes: 2 additions & 1 deletion synapse/app/user_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ def __init__(self, hs):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()

@defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
super(UserDirectoryReplicationHandler, self).on_rdata(
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def on_rdata(self, stream_name, token, rows):
Can be overriden in subclasses to handle more.
"""
logger.info("Received rdata %s -> %s", stream_name, token)
self.store.process_replication_rows(stream_name, token, rows)
return self.store.process_replication_rows(stream_name, token, rows)

def on_position(self, stream_name, token):
"""Called when we get new position data. By default this just pokes
the slave store.

Can be overriden in subclasses to handle more.
"""
self.store.process_replication_rows(stream_name, token, [])
return self.store.process_replication_rows(stream_name, token, [])

def on_sync(self, data):
"""When we received a SYNC we wake up any deferreds that were waiting
Expand Down
12 changes: 12 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ def to_line(self):
"""
return self.data

def get_logcontext_id(self):
"""Get a suitable string for the logcontext when processing this command"""

# by default, we just use the command name.
return self.NAME


class ServerCommand(Command):
"""Sent by the server on new connection and includes the server_name.
Expand Down Expand Up @@ -116,6 +122,9 @@ def to_line(self):
_json_encoder.encode(self.row),
))

def get_logcontext_id(self):
return "RDATA-" + self.stream_name


class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
Expand Down Expand Up @@ -190,6 +199,9 @@ def from_line(cls, line):
def to_line(self):
return " ".join((self.stream_name, str(self.token),))

def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name


class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
Expand Down
42 changes: 29 additions & 13 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
from twisted.python.failure import Failure

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string

from .commands import (
Expand Down Expand Up @@ -222,7 +224,11 @@ def lineReceived(self, line):

# Now lets try and call on_<CMD_NAME> function
try:
getattr(self, "on_%s" % (cmd_name,))(cmd)
run_as_background_process(
"replication-" + cmd.get_logcontext_id(),
getattr(self, "on_%s" % (cmd_name,)),
cmd,
)
except Exception:
logger.exception("[%s] Failed to handle line: %r", self.id(), line)

Expand Down Expand Up @@ -387,7 +393,7 @@ def on_NAME(self, cmd):
self.name = cmd.data

def on_USER_SYNC(self, cmd):
self.streamer.on_user_sync(
return self.streamer.on_user_sync(
self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms,
)

Expand All @@ -397,22 +403,33 @@ def on_REPLICATE(self, cmd):

if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
for stream in iterkeys(self.streamer.streams_by_name):
self.subscribe_to_stream(stream, token)
deferreds = [
run_in_background(
self.subscribe_to_stream,
stream, token,
)
for stream in iterkeys(self.streamer.streams_by_name)
]

return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
self.subscribe_to_stream(stream_name, token)
return self.subscribe_to_stream(stream_name, token)

def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
return self.streamer.federation_ack(cmd.token)

def on_REMOVE_PUSHER(self, cmd):
self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
return self.streamer.on_remove_pusher(
cmd.app_id, cmd.push_key, cmd.user_id,
)

def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
return self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

def on_USER_IP(self, cmd):
self.streamer.on_user_ip(
return self.streamer.on_user_ip(
cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
cmd.last_seen,
)
Expand Down Expand Up @@ -542,14 +559,13 @@ def on_RDATA(self, cmd):
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows.append(row)

self.handler.on_rdata(stream_name, cmd.token, rows)
return self.handler.on_rdata(stream_name, cmd.token, rows)

def on_POSITION(self, cmd):
self.handler.on_position(cmd.stream_name, cmd.token)
return self.handler.on_position(cmd.stream_name, cmd.token)

def on_SYNC(self, cmd):
self.handler.on_sync(cmd.data)
return self.handler.on_sync(cmd.data)

def replicate(self, stream_name, token):
"""Send the subscription request to the server
Expand Down