Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WPB-4559 user receives 2 k federation connection removed on anta #3580

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type DefederationDomain = Domain
defederationQueue :: Text
defederationQueue = "delete-federation"

connectionRemovedQueue :: Text
connectionRemovedQueue = "connection-removed"

-- | If you ever change this function and modify
-- queue parameters, know that it will start failing in the
-- next release! So be prepared to write migrations.
Expand Down
10 changes: 1 addition & 9 deletions libs/wire-api/src/Wire/API/FederationUpdate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ module Wire.API.FederationUpdate
( syncFedDomainConfigs,
SyncFedDomainConfigsCallback (..),
emptySyncFedDomainConfigsCallback,
deleteFederationRemoteGalley,
fetch,
)
where

import Control.Concurrent.Async
import Control.Exception
import Control.Retry qualified as R
import Data.Domain
import Data.Set qualified as Set
import Data.Text
import Data.Typeable (cast)
import Imports
import Network.HTTP.Client (defaultManagerSettings, newManager)
import Servant.Client (BaseUrl (BaseUrl), ClientEnv (ClientEnv), ClientError, ClientM, Scheme (Http), runClientM)
import Servant.Client (BaseUrl (BaseUrl), ClientEnv (ClientEnv), ClientError, Scheme (Http), runClientM)
import Servant.Client.Internal.HttpClient (defaultMakeClientRequest)
import System.Logger qualified as L
import Util.Options
Expand All @@ -34,9 +32,6 @@ syncFedDomainConfigs (Endpoint h p) log' cb = do
updateDomainsThread <- async $ loop log' clientEnv cb ioref
pure (ioref, updateDomainsThread)

deleteFedRemoteGalley :: Domain -> ClientM ()
deleteFedRemoteGalley dom = namedClient @IAPI.API @"delete-federation-remote-from-galley" dom

-- | Initial function for getting the set of domains from brig, and an update interval
initialize :: L.Logger -> ClientEnv -> IO FederationDomainConfigs
initialize logger clientEnv =
Expand All @@ -56,9 +51,6 @@ initialize logger clientEnv =
Just c -> pure c
Nothing -> throwIO $ ErrorCall "*** Failed to reach brig for federation setup, giving up!"

deleteFederationRemoteGalley :: Domain -> ClientEnv -> IO (Either ClientError ())
deleteFederationRemoteGalley dom = runClientM $ deleteFedRemoteGalley dom

loop :: L.Logger -> ClientEnv -> SyncFedDomainConfigsCallback -> IORef FederationDomainConfigs -> IO ()
loop logger clientEnv (SyncFedDomainConfigsCallback callback) env = forever $
catch go $ \(e :: SomeException) -> do
Expand Down
17 changes: 0 additions & 17 deletions libs/wire-api/src/Wire/API/Routes/Internal/Brig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -773,23 +773,6 @@ type FederationRemotesAPI =
:> Capture "domain" Domain
:> Delete '[JSON] ()
)
-- This is nominally similar to delete-federation-remotes,
-- but is called from Galley to delete the one-on-one coversations.
-- This is needed as Galley doesn't have access to the tables
-- that hold these values. We don't want these deletes to happen
-- in delete-federation-remotes as brig might fall over and leave
-- some records hanging around. Galley uses a Rabbit queue to track
-- what is has done and can recover from a service falling over.
:<|> Named
"delete-federation-remote-from-galley"
( Description FederationRemotesAPIDescription
:> Description FederationRemotesAPIDeleteDescription
:> "federation"
:> "remote"
:> Capture "domain" Domain
:> "galley"
:> Delete '[JSON] ()
)

type FederationRemotesAPIDescription =
"See https://docs.wire.com/understand/federation/backend-communication.html#configuring-remote-connections for background. "
Expand Down
9 changes: 7 additions & 2 deletions services/background-worker/background-worker.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,25 @@ library
, amqp
, async
, base
, bilge
, brig
, bytestring-conversion
, cassandra-util
, containers
, cql-io-tinylog
, errors
, exceptions
, extended
, galley
, HsOpenSSL
, http-client
, http-types
, http2-manager
, imports
, lens
, metrics-core
, metrics-wai
, monad-control
, polysemy
, polysemy-wire-zoo
, prometheus-client
, retry
, servant-client
Expand Down
14 changes: 14 additions & 0 deletions services/background-worker/background-worker.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,17 @@ rabbitmq:
backendNotificationPusher:
pushBackoffMinWait: 1000
pushBackoffMaxWait: 1000000

federationDomain: example.com

cassandra:
brig:
endpoint:
host: 127.0.0.1
port: 9042
keyspace: brig_test
galley:
endpoint:
host: 127.0.0.1
port: 9042
keyspace: galley_test
15 changes: 10 additions & 5 deletions services/background-worker/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
, amqp
, async
, base
, bilge
, bytestring
, bytestring-conversion
, cassandra-util
, containers
, errors
, exceptions
, extended
, federator
, galley
, gitignoreSource
, HsOpenSSL
, hspec
Expand All @@ -23,11 +25,12 @@
, http2-manager
, HUnit
, imports
, lens
, lib
, metrics-core
, metrics-wai
, monad-control
, polysemy
, polysemy-wire-zoo
, prometheus-client
, QuickCheck
, retry
Expand Down Expand Up @@ -57,20 +60,22 @@ mkDerivation {
amqp
async
base
bilge
bytestring-conversion
cassandra-util
containers
errors
exceptions
extended
galley
HsOpenSSL
http-client
http-types
http2-manager
imports
lens
metrics-core
metrics-wai
monad-control
polysemy
polysemy-wire-zoo
prometheus-client
retry
servant-client
Expand Down
25 changes: 22 additions & 3 deletions services/background-worker/src/Wire/BackgroundWorker/Env.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@

module Wire.BackgroundWorker.Env where

import Cassandra (Keyspace (Keyspace))
import Cassandra.Settings hiding (Logger)
import Control.Concurrent.Async
import Control.Concurrent.Chan
import Control.Lens ((^.))
import Control.Monad.Base
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Data.Domain
import Data.Map.Strict qualified as Map
import Data.Metrics qualified as Metrics
import Data.Text (unpack)
import Database.CQL.IO.Tinylog qualified as CT
import HTTP2.Client.Manager
import Imports
import Imports hiding (init)
import Network.AMQP (Channel)
import Network.AMQP.Extended
import Network.HTTP.Client
Expand All @@ -23,7 +29,7 @@ import Servant.Client qualified as Servant
import System.Logger qualified as Log
import System.Logger.Class (Logger, MonadLogger (..))
import System.Logger.Extended qualified as Log
import Util.Options
import Util.Options as Opts
import Wire.API.FederationUpdate
import Wire.API.Routes.FederationDomainConfig
import Wire.BackgroundWorker.Options
Expand Down Expand Up @@ -55,7 +61,9 @@ data Env = Env
-- This allows us to reuse existing code. This only pushes.
notificationChannel :: MVar Channel,
backendNotificationsConfig :: BackendNotificationsConfig,
statuses :: IORef (Map Worker IsWorking)
statuses :: IORef (Map Worker IsWorking),
federationDomain :: Domain,
brigDB :: ClientState
}

data BackendNotificationMetrics = BackendNotificationMetrics
Expand Down Expand Up @@ -103,6 +111,17 @@ mkEnv opts = do
backendNotificationMetrics <- mkBackendNotificationMetrics
notificationChannel <- mkRabbitMqChannelMVar logger $ demoteOpts opts.rabbitmq
let backendNotificationsConfig = opts.backendNotificationPusher
federationDomain = opts.federationDomain
brigDB <- do
let h = opts.cassandra.brig ^. endpoint . Opts.host
p = opts.cassandra.brig ^. endpoint . Opts.port
ks = opts.cassandra.brig ^. keyspace
init
$ setLogger (CT.mkLogger logger)
. setPortNumber (fromIntegral p)
. setContacts (unpack h) []
. setKeyspace (Keyspace ks)
$ defSettings
pure (Env {..}, syncThread)

initHttp2Manager :: IO Http2Manager
Expand Down
13 changes: 12 additions & 1 deletion services/background-worker/src/Wire/BackgroundWorker/Options.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Wire.BackgroundWorker.Options where

import Data.Aeson
import Data.Domain
import Imports
import Network.AMQP.Extended
import System.Logger.Extended
Expand All @@ -16,10 +17,20 @@ data Opts = Opts
brig :: !Endpoint,
-- | Seconds, Nothing for no timeout
defederationTimeout :: Maybe Int,
backendNotificationPusher :: BackendNotificationsConfig
backendNotificationPusher :: BackendNotificationsConfig,
federationDomain :: !Domain,
cassandra :: !DBSettings
}
deriving (Show, Generic)

data DBSettings = DBSettings
{ brig :: !CassandraOpts,
galley :: !CassandraOpts
}
deriving (Show, Generic)

instance FromJSON DBSettings

instance FromJSON Opts

data BackendNotificationsConfig = BackendNotificationsConfig
Expand Down
Loading
Loading