Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions libs/wire-subsystems/src/Wire/FederationAPIAccess.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,16 @@ data FederationAPIAccess (fedM :: Component -> Type -> Type) m a where
Remote x ->
fedM c a ->
FederationAPIAccess fedM m (Either FederationError a)
RunFederatedConcurrently ::
forall (c :: Component) f a m x fedM.
(KnownComponent c, Foldable f) =>
f (Remote x) ->
(Remote x -> fedM c a) ->
FederationAPIAccess fedM m [Either (Remote x, FederationError) (Remote a)]
-- | An action similar to 'RunFederatedConcurrently', but the input is
-- bucketed by domain before the RPCs are sent to the remote backends.
RunFederatedBucketed ::
forall (c :: Component) f a m x fedM.
RunFederatedConcurrentlyEither ::
(KnownComponent c, Foldable f, Functor f) =>
f (Remote x) ->
(Remote [x] -> fedM c a) ->
FederationAPIAccess fedM m [Either (Remote [x], FederationError) (Remote a)]
RunFederatedConcurrentlyBucketsEither ::
(KnownComponent c, Foldable f) =>
f (Remote x) ->
(Remote x -> fedM c a) ->
FederationAPIAccess fedM m [Either (Remote x, FederationError) (Remote a)]
IsFederationConfigured :: FederationAPIAccess fedM m Bool

makeSem ''FederationAPIAccess
Expand All @@ -61,3 +57,18 @@ runFederated ::
fedM c a ->
Sem r a
runFederated rx c = runFederatedEither rx c >>= fromEither

runFederatedConcurrently ::
forall c fedM f x a r.
( Member (FederationAPIAccess fedM) r,
Member (Error FederationError) r,
KnownComponent c,
Foldable f,
Functor f
) =>
f (Remote x) ->
(Remote [x] -> fedM c a) ->
Sem r [Remote a]
runFederatedConcurrently rx c = do
results <- runFederatedConcurrentlyEither rx c
fromEither $ mapLeft snd $ sequence results
20 changes: 10 additions & 10 deletions libs/wire-subsystems/src/Wire/FederationAPIAccess/Interpreter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ interpretFederationAPIAccessGeneral runFedM isFederationConfigured =
interpret $
\case
RunFederatedEither remote rpc -> runFederatedEither runFedM remote rpc
RunFederatedConcurrently remotes rpc -> runFederatedConcurrently runFedM remotes rpc
RunFederatedBucketed remotes rpc -> runFederatedBucketed runFedM remotes rpc
RunFederatedConcurrentlyEither remotes rpc -> runFederatedConcurrently runFedM remotes rpc
RunFederatedConcurrentlyBucketsEither remotes rpc -> runFederatedBucketed runFedM remotes rpc
IsFederationConfigured -> isFederationConfigured

runFederatedEither ::
Expand All @@ -95,25 +95,25 @@ runFederatedEither runFedM (tDomain -> remoteDomain) rpc =

runFederatedConcurrently ::
( Foldable f,
Member (Concurrency 'Unsafe) r
Member (Concurrency 'Unsafe) r,
Functor f
) =>
FederatedActionRunner fedM r ->
f (Remote a) ->
(Remote a -> fedM c b) ->
Sem r [Either (Remote a, FederationError) (Remote b)]
(Remote [a] -> fedM c b) ->
Sem r [Either (Remote [a], FederationError) (Remote b)]
runFederatedConcurrently runFedM xs rpc =
unsafePooledForConcurrentlyN 8 (toList xs) $ \r ->
unsafePooledForConcurrentlyN 8 (bucketRemote xs) $ \r ->
bimap (r,) (qualifyAs r) <$> runFederatedEither runFedM r (rpc r)

runFederatedBucketed ::
( Foldable f,
Functor f,
Member (Concurrency 'Unsafe) r
) =>
FederatedActionRunner fedM r ->
f (Remote a) ->
(Remote [a] -> fedM c b) ->
Sem r [Either (Remote [a], FederationError) (Remote b)]
(Remote a -> fedM c b) ->
Sem r [Either (Remote a, FederationError) (Remote b)]
runFederatedBucketed runFedM xs rpc =
unsafePooledForConcurrentlyN 8 (bucketRemote xs) $ \r ->
unsafePooledForConcurrentlyN 8 (toList xs) $ \r ->
bimap (r,) (qualifyAs r) <$> runFederatedEither runFedM r (rpc r)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Galley.Effects.ProposalStore where
module Wire.ProposalStore where

import Imports
import Polysemy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,52 @@
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Galley.Cassandra.Proposal
module Wire.ProposalStore.Cassandra
( interpretProposalStoreToCassandra,
ProposalOrigin (..),
)
where

import Cassandra
import Data.Timeout
import Galley.Cassandra.Store
import Galley.Cassandra.Util
import Galley.Effects.ProposalStore
import Imports
import Polysemy
import Polysemy.Input
import Polysemy.TinyLog
import Wire.API.MLS.Epoch
import Wire.API.MLS.Group
import Wire.API.MLS.Proposal
import Wire.API.MLS.Serialisation
import Wire.ConversationStore.Cassandra.Instances ()
import Wire.ProposalStore
import Wire.Util (embedClient)

-- | Proposals in the database expire after this timeout
defaultTTL :: Timeout
defaultTTL = 28 # Day

interpretProposalStoreToCassandra ::
( Member (Embed IO) r,
Member (Input ClientState) r,
Member TinyLog r
Member (Input ClientState) r
) =>
Sem (ProposalStore ': r) a ->
Sem r a
interpretProposalStoreToCassandra = interpret $ \case
StoreProposal groupId epoch ref origin raw -> do
logEffect "ProposalStore.StoreProposal"
embedClient . retry x5 $
client <- input
embedClient client . retry x5 $
write (storeQuery defaultTTL) (params LocalQuorum (groupId, epoch, ref, origin, raw))
GetProposal groupId epoch ref -> do
logEffect "ProposalStore.GetProposal"
embedClient (runIdentity <$$> retry x1 (query1 getQuery (params LocalQuorum (groupId, epoch, ref))))
client <- input
embedClient client (runIdentity <$$> retry x1 (query1 getQuery (params LocalQuorum (groupId, epoch, ref))))
GetAllPendingProposalRefs groupId epoch -> do
logEffect "ProposalStore.GetAllPendingProposalRefs"
embedClient (runIdentity <$$> retry x1 (query getAllPendingRef (params LocalQuorum (groupId, epoch))))
client <- input
embedClient client (runIdentity <$$> retry x1 (query getAllPendingRef (params LocalQuorum (groupId, epoch))))
GetAllPendingProposals groupId epoch -> do
logEffect "ProposalStore.GetAllPendingProposals"
embedClient $ retry x1 (query getAllPending (params LocalQuorum (groupId, epoch)))
client <- input
embedClient client $ retry x1 (query getAllPending (params LocalQuorum (groupId, epoch)))
DeleteAllProposals groupId -> do
logEffect "ProposalStore.DeleteAllProposals"
embedClient $ retry x5 (write deleteAllProposalsForGroup (params LocalQuorum (Identity groupId)))
client <- input
embedClient client $ retry x5 (write deleteAllProposalsForGroup (params LocalQuorum (Identity groupId)))

storeQuery :: Timeout -> PrepQuery W (GroupId, Epoch, ProposalRef, ProposalOrigin, RawMLS Proposal) ()
storeQuery ttl =
Expand Down
4 changes: 2 additions & 2 deletions libs/wire-subsystems/test/unit/Wire/MiniBackend.hs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,6 @@ miniFederationAPIAccess online = do
if isJust (M.lookup (qDomain $ tUntagged remote) online)
then FI.runFederatedEither runner remote rpc
else pure $ Left do FederationUnexpectedError "RunFederatedEither"
RunFederatedConcurrently _remotes _rpc -> error "unimplemented: RunFederatedConcurrently"
RunFederatedBucketed _domain _rpc -> error "unimplemented: RunFederatedBucketed"
RunFederatedConcurrentlyEither _remotes _rpc -> error "unimplemented: RunFederatedConcurrently"
RunFederatedConcurrentlyBucketsEither _domain _rpc -> error "unimplemented: RunFederatedBucketed"
IsFederationConfigured -> pure True
2 changes: 2 additions & 0 deletions libs/wire-subsystems/wire-subsystems.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ library
Wire.PropertyStore.Cassandra
Wire.PropertySubsystem
Wire.PropertySubsystem.Interpreter
Wire.ProposalStore
Wire.ProposalStore.Cassandra
Wire.RateLimit
Wire.RateLimit.Interpreter
Wire.Rpc
Expand Down
8 changes: 2 additions & 6 deletions services/galley/galley.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ library
Galley.Cassandra.Client
Galley.Cassandra.Code
Galley.Cassandra.CustomBackend
Galley.Cassandra.Proposal
Galley.Cassandra.Queries
Galley.Cassandra.SearchVisibility
Galley.Cassandra.Store
Expand All @@ -155,8 +154,6 @@ library
Galley.Effects.ClientStore
Galley.Effects.CodeStore
Galley.Effects.CustomBackendStore
Galley.Effects.FederatorAccess
Galley.Effects.ProposalStore
Galley.Effects.Queue
Galley.Effects.SearchVisibilityStore
Galley.Effects.TeamFeatureStore
Expand All @@ -165,7 +162,6 @@ library
Galley.Env
Galley.External.LegalHoldService
Galley.External.LegalHoldService.Internal
Galley.Intra.Federator
Galley.Intra.Util
Galley.Keys
Galley.Monad
Expand Down Expand Up @@ -257,7 +253,7 @@ library
Galley.Types.Clients
Galley.Validation

ghc-options:
ghc-options: -fplugin=Polysemy.Plugin
other-modules: Paths_galley
hs-source-dirs: src
build-depends:
Expand All @@ -280,7 +276,6 @@ library
, crypton
, crypton-x509
, data-default
, data-timeout
, errors >=2.0
, exceptions >=0.4
, extended
Expand All @@ -305,6 +300,7 @@ library
, pem
, polysemy
, polysemy-conc
, polysemy-plugin
, polysemy-wire-zoo
, prometheus-client
, raw-strings-qq >=1.0
Expand Down
34 changes: 20 additions & 14 deletions services/galley/src/Galley/API/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ import Galley.API.Util
import Galley.Data.Scope (Scope (ReusableCode))
import Galley.Effects
import Galley.Effects.CodeStore qualified as E
import Galley.Effects.FederatorAccess qualified as E
import Galley.Effects.ProposalStore qualified as E
import Galley.Env (Env)
import Galley.Options
import Galley.Validation
Expand All @@ -105,6 +103,7 @@ import Wire.API.Federation.API
import Wire.API.Federation.API.Brig
import Wire.API.Federation.API.Galley
import Wire.API.Federation.API.Galley qualified as F
import Wire.API.Federation.Client (FederatorClient)
import Wire.API.Federation.Error
import Wire.API.FederationStatus
import Wire.API.MLS.Group.Serialisation qualified as Serialisation
Expand All @@ -119,8 +118,10 @@ import Wire.API.User as User
import Wire.BrigAPIAccess qualified as E
import Wire.ConversationStore qualified as E
import Wire.ConversationSubsystem
import Wire.FederationAPIAccess qualified as E
import Wire.FireAndForget qualified as E
import Wire.NotificationSubsystem
import Wire.ProposalStore qualified as E
import Wire.Sem.Now (Now)
import Wire.Sem.Now qualified as Now
import Wire.StoredConversation
Expand All @@ -132,7 +133,8 @@ import Wire.UserList

type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Constraint where
HasConversationActionEffects 'ConversationJoinTag r =
( Member BrigAPIAccess r,
( -- TODO: Replace with subsystems
Member BrigAPIAccess r,
Member (Error FederationError) r,
Member (Error InternalError) r,
Member (ErrorS 'NotATeamMember) r,
Expand All @@ -148,13 +150,17 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
Member (Error NonFederatingBackends) r,
Member (Error UnreachableBackends) r,
Member ExternalAccess r,
Member FederatorAccess r,
-- TODO: Move to subsystems
Member (FederationAPIAccess FederatorClient) r,
Member NotificationSubsystem r,
-- TODO: Replace with ConversationSubsystemConfig
Member (Input Env) r,
-- TODO: Replace with ConversationSubsystemConfig
Member (Input Opts) r,
Member Now r,
Member LegalHoldStore r,
Member ConversationStore r,
-- TODO: Move to subsystems
Member ProposalStore r,
Member Random r,
Member TeamStore r,
Expand All @@ -166,7 +172,7 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
( Member (Error InternalError) r,
Member (Error NoChanges) r,
Member ExternalAccess r,
Member FederatorAccess r,
Member (FederationAPIAccess FederatorClient) r,
Member NotificationSubsystem r,
Member Now r,
Member (Input Env) r,
Expand All @@ -182,7 +188,7 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
Member (Input Env) r,
Member Now r,
Member ExternalAccess r,
Member FederatorAccess r,
Member (FederationAPIAccess FederatorClient) r,
Member NotificationSubsystem r,
Member (Error InternalError) r,
Member Random r,
Expand All @@ -199,7 +205,7 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
Member ConversationStore r,
Member (Error FederationError) r,
Member (ErrorS 'NotATeamMember) r,
Member FederatorAccess r,
Member (FederationAPIAccess FederatorClient) r,
Member ProposalStore r,
Member TeamStore r
)
Expand All @@ -218,7 +224,7 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
Member (ErrorS 'InvalidTargetAccess) r,
Member (ErrorS ('ActionDenied 'RemoveConversationMember)) r,
Member ExternalAccess r,
Member FederatorAccess r,
Member (FederationAPIAccess FederatorClient) r,
Member FireAndForget r,
Member NotificationSubsystem r,
Member (Input Env) r,
Expand All @@ -245,7 +251,7 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
Member (Error NoChanges) r,
Member BrigAPIAccess r,
Member ExternalAccess r,
Member FederatorAccess r,
Member (FederationAPIAccess FederatorClient) r,
Member NotificationSubsystem r,
Member (Input Env) r,
Member (Input Opts) r,
Expand All @@ -267,7 +273,7 @@ type family HasConversationActionEffects (tag :: ConversationActionTag) r :: Con
Member (ErrorS InvalidOperation) r,
Member ConversationStore r,
Member ExternalAccess r,
Member FederatorAccess r,
Member (FederationAPIAccess FederatorClient) r,
Member NotificationSubsystem r,
Member ProposalStore r,
Member Random r,
Expand Down Expand Up @@ -377,7 +383,7 @@ enforceFederationProtocol proto domains = do
checkFederationStatus ::
( Member (Error UnreachableBackends) r,
Member (Error NonFederatingBackends) r,
Member FederatorAccess r
Member (FederationAPIAccess FederatorClient) r
) =>
RemoteDomains ->
Sem r ()
Expand All @@ -389,7 +395,7 @@ checkFederationStatus req = do

getFederationStatus ::
( Member (Error UnreachableBackends) r,
Member FederatorAccess r
Member (FederationAPIAccess FederatorClient) r
) =>
RemoteDomains ->
Sem r FederationStatus
Expand Down Expand Up @@ -655,7 +661,7 @@ performConversationJoin qusr lconv (ConversationJoin invited role joinType) = do
then checkFederationStatus (RemoteDomains (invitedRemoteDomains <> existingRemoteDomains))
else -- even if there are no new remotes, we still need to check they are reachable
void . (ensureNoUnreachableBackends =<<) $
E.runFederatedConcurrentlyEither @_ @'Brig invitedRemoteUsers $ \_ ->
E.runFederatedConcurrentlyEither @_ @_ @'Brig invitedRemoteUsers $ \_ ->
pure ()

conv :: StoredConversation
Expand Down Expand Up @@ -1119,7 +1125,7 @@ notifyTypingIndicator ::
( Member Now r,
Member (Input (Local ())) r,
Member NotificationSubsystem r,
Member FederatorAccess r
Member (FederationAPIAccess FederatorClient) r
) =>
StoredConversation ->
Qualified UserId ->
Expand Down
Loading