Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions apps/meteor/ee/server/hooks/federation/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
// import { FederationMatrix } from '@rocket.chat/core-services';
import { FederationMatrix } from '@rocket.chat/core-services';
import type { IMessage, IUser } from '@rocket.chat/core-typings';

// import { callbacks } from '../../../../lib/callbacks';
import { callbacks } from '../../../../lib/callbacks';

// callbacks.add('federation-event-example', async () => FederationMatrix.handleExample(), callbacks.priority.MEDIUM, 'federation-event-example-handler');
callbacks.add(
'afterSetReaction',
async (message: IMessage, params: { user: IUser; reaction: string }): Promise<void> => {
// Don't federate reactions that came from Matrix
if (params.user.username?.includes(':')) {
return;
}
await FederationMatrix.sendReaction(message._id, params.reaction, params.user);
},
callbacks.priority.HIGH,
'federation-matrix-after-set-reaction',
);

callbacks.add(
'afterUnsetReaction',
async (_message: IMessage, params: { user: IUser; reaction: string; oldMessage: IMessage }): Promise<void> => {
// Don't federate reactions that came from Matrix
if (params.user.username?.includes(':')) {
return;
}
await FederationMatrix.removeReaction(params.oldMessage._id, params.reaction, params.user, params.oldMessage);
},
callbacks.priority.HIGH,
'federation-matrix-after-unset-reaction',
);
2 changes: 1 addition & 1 deletion apps/meteor/ee/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import './configuration/index';
import './local-services/ldap/service';
import './methods/getReadReceipts';
import './patches';
// import './hooks/federation';
import './hooks/federation';

export * from './apps/startup';
export { registerEEBroker } from './startup';
10 changes: 9 additions & 1 deletion apps/meteor/server/services/messages/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { executeSetReaction } from '../../../app/reactions/server/setReaction';
import { settings } from '../../../app/settings/server';
import { getUserAvatarURL } from '../../../app/utils/server/getUserAvatarURL';
import { BeforeSaveCannedResponse } from '../../../ee/server/hooks/messages/BeforeSaveCannedResponse';
import { FederationMatrixInvalidConfigurationError } from '../federation/utils';
import { FederationMatrixInvalidConfigurationError, getFederationVersion } from '../federation/utils';
import { FederationActions } from './hooks/BeforeFederationActions';
import { BeforeSaveBadWords } from './hooks/BeforeSaveBadWords';
import { BeforeSaveCheckMAC } from './hooks/BeforeSaveCheckMAC';
Expand Down Expand Up @@ -265,6 +265,14 @@ export class MessageService extends ServiceClassInternal implements IMessageServ
// }

async beforeReacted(message: IMessage, room: AtLeast<IRoom, 'federated'>) {
const federationVersion = getFederationVersion();

// If we are running in native mode (FederationMatrix service), we should skip this check
// because reactions will be handled using callbacks
if (federationVersion === 'native') {
return;
}

if (!FederationActions.shouldPerformAction(message, room)) {
throw new FederationMatrixInvalidConfigurationError('Unable to react to message');
}
Expand Down
2 changes: 1 addition & 1 deletion ee/apps/federation-service/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function handleHealthCheck(app: Hono) {

app.mount('/_matrix', matrix.getHonoRouter().fetch);
app.mount('/.well-known', wellKnown.getHonoRouter().fetch);

handleHealthCheck(app);

serve({
Expand Down
2 changes: 2 additions & 0 deletions ee/packages/federation-matrix/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"@babel/preset-env": "~7.26.0",
"@babel/preset-typescript": "~7.26.0",
"@rocket.chat/eslint-config": "workspace:^",
"@types/emojione": "^2.2.9",
"@types/node": "~22.14.0",
"babel-jest": "~30.0.0",
"eslint": "~8.45.0",
Expand Down Expand Up @@ -40,6 +41,7 @@
"@rocket.chat/models": "workspace:^",
"@rocket.chat/network-broker": "workspace:^",
"@rocket.chat/rest-typings": "workspace:^",
"emojione": "^4.5.0",
"mongodb": "6.10.0",
"pino": "8.21.0",
"reflect-metadata": "^0.2.2"
Expand Down
135 changes: 116 additions & 19 deletions ee/packages/federation-matrix/src/FederationMatrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import type { IMessage, IRoom, IUser } from '@rocket.chat/core-typings';
import { Emitter } from '@rocket.chat/emitter';
import { Router } from '@rocket.chat/http-router';
import { Logger } from '@rocket.chat/logger';
import { MatrixBridgedUser, MatrixBridgedRoom, Users } from '@rocket.chat/models';
import { MatrixBridgedUser, MatrixBridgedRoom, Users, Messages } from '@rocket.chat/models';
import emojione from 'emojione';

import { getWellKnownRoutes } from './api/.well-known/server';
import { getMatrixInviteRoutes } from './api/_matrix/invite';
Expand Down Expand Up @@ -59,7 +60,7 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
emitter: instance.eventHandler,
};

await createFederationContainer(containerOptions, config);
createFederationContainer(containerOptions, config);
instance.homeserverServices = getAllServices();
instance.buildMatrixHTTPRoutes();

Expand Down Expand Up @@ -125,11 +126,7 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
const roomName = room.name || room.fname || 'Untitled Room';

// canonical alias computed from name
const matrixRoomResult = await this.homeserverServices.room.createRoom(
matrixUserId,
roomName,
room.t === 'c' ? 'public' : 'invite',
);
const matrixRoomResult = await this.homeserverServices.room.createRoom(matrixUserId, roomName, room.t === 'c' ? 'public' : 'invite');

this.logger.debug('Matrix room created:', matrixRoomResult);

Expand All @@ -152,7 +149,6 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
} catch (error) {
this.logger.error('Error creating or updating bridged user:', error);
}

// We are not generating bridged users for members outside of the current workspace
// They will be created when the invite is accepted

Expand All @@ -178,30 +174,131 @@ export class FederationMatrix extends ServiceClass implements IFederationMatrixS
const matrixUserId = `@${user.username}:${matrixDomain}`;
const existingMatrixUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(user._id);
if (!existingMatrixUserId) {
const port = await Settings.get<number>('Federation_Service_Matrix_Port');
const domain = await Settings.get<string>('Federation_Service_Matrix_Domain');
const matrixDomain = port === 443 || port === 80 ? domain : `${domain}:${port}`;
await MatrixBridgedUser.createOrUpdateByLocalId(user._id, matrixUserId, true, matrixDomain);
}

// TODO: We should fix this to not hardcode neither inform the target server
// This is on the homeserver mandate to track all the eligible servers in the federated room
const targetServer = 'hs1-garim.tunnel.dev.rocket.chat';

if (!this.homeserverServices) {
this.logger.warn('Homeserver services not available, skipping message send');
return;
}

const result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, matrixUserId, targetServer);
const actualMatrixUserId = existingMatrixUserId || matrixUserId;

// TODO: Store the event ID mapping for future reference (edits, deletions, etc.)
// This would allow us to map between Rocket.Chat message IDs and Matrix event IDs
const result = await this.homeserverServices.message.sendMessage(matrixRoomId, message.msg, actualMatrixUserId);

this.logger.debug('Message sent to Matrix successfully:', result.event_id);
await Messages.setFederationEventIdById(message._id, result.eventId);

this.logger.debug('Message sent to Matrix successfully:', result.eventId);
} catch (error) {
this.logger.error('Failed to send message to Matrix:', error);
throw error;
}
}

async sendReaction(messageId: string, reaction: string, user: IUser): Promise<void> {
try {
const message = await Messages.findOneById(messageId);
if (!message) {
throw new Error(`Message ${messageId} not found`);
}

const matrixRoomId = await MatrixBridgedRoom.getExternalRoomId(message.rid);
if (!matrixRoomId) {
throw new Error(`No Matrix room mapping found for room ${message.rid}`);
}

const matrixEventId = message.federation?.eventId;
if (!matrixEventId) {
throw new Error(`No Matrix event ID mapping found for message ${messageId}`);
}

const reactionKey = emojione.shortnameToUnicode(reaction);

const existingMatrixUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(user._id);
if (!existingMatrixUserId) {
this.logger.error(`No Matrix user ID mapping found for user ${user._id}`);
return;
}

const eventId = await this.homeserverServices.message.sendReaction(matrixRoomId, matrixEventId, reactionKey, existingMatrixUserId);

await Messages.setFederationReactionEventId(user.username || '', messageId, reaction, eventId);

this.logger.debug('Reaction sent to Matrix successfully:', eventId);
} catch (error) {
this.logger.error('Failed to send reaction to Matrix:', error);
throw error;
}
}

async removeReaction(messageId: string, reaction: string, user: IUser, oldMessage: IMessage): Promise<void> {
try {
const message = await Messages.findOneById(messageId);
if (!message) {
this.logger.error(`Message ${messageId} not found`);
return;
}

const targetEventId = message.federation?.eventId;
if (!targetEventId) {
this.logger.warn(`No federation event ID found for message ${messageId}`);
return;
}

const matrixRoomId = await MatrixBridgedRoom.getExternalRoomId(message.rid);
if (!matrixRoomId) {
this.logger.error(`No Matrix room mapping found for room ${message.rid}`);
return;
}

const reactionKey = emojione.shortnameToUnicode(reaction);
const existingMatrixUserId = await MatrixBridgedUser.getExternalUserIdByLocalUserId(user._id);
if (!existingMatrixUserId) {
this.logger.error(`No Matrix user ID mapping found for user ${user._id}`);
return;
}

const reactionData = oldMessage.reactions?.[reaction];
if (!reactionData?.federationReactionEventIds) {
return;
}

for await (const [eventId, username] of Object.entries(reactionData.federationReactionEventIds)) {
if (username !== user.username) {
continue;
}

const redactionEventId = await this.homeserverServices.message.unsetReaction(
matrixRoomId,
eventId,
reactionKey,
existingMatrixUserId,
);
if (!redactionEventId) {
this.logger.warn('No reaction event found to remove in Matrix');
return;
}

await Messages.unsetFederationReactionEventId(eventId, messageId, reaction);
break;
}
} catch (error) {
this.logger.error('Failed to remove reaction from Matrix:', error);
throw error;
}
}

async getEventById(eventId: string): Promise<any | null> {
if (!this.homeserverServices) {
this.logger.warn('Homeserver services not available');
return null;
}

try {
return await this.homeserverServices.event.getEventById(eventId);
} catch (error) {
this.logger.error('Failed to get event by ID:', error);
throw error;
}
}
}
2 changes: 2 additions & 0 deletions ee/packages/federation-matrix/src/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import type { Emitter } from '@rocket.chat/emitter';
import { invite } from './invite';
import { message } from './message';
import { ping } from './ping';
import { reaction } from './reaction';

export function registerEvents(emitter: Emitter<HomeserverEventSignatures>) {
ping(emitter);
message(emitter);
invite(emitter);
reaction(emitter);
}
91 changes: 91 additions & 0 deletions ee/packages/federation-matrix/src/events/reaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import type { HomeserverEventSignatures } from '@hs/federation-sdk';
import { Message, FederationMatrix } from '@rocket.chat/core-services';
import type { Emitter } from '@rocket.chat/emitter';
import { Logger } from '@rocket.chat/logger';
import { Users, Messages } from '@rocket.chat/models'; // Rooms
import emojione from 'emojione';

const logger = new Logger('federation-matrix:reaction');

export function reaction(emitter: Emitter<HomeserverEventSignatures>) {
emitter.on('homeserver.matrix.reaction', async (data) => {
try {
const isSetReaction = data.content?.['m.relates_to'];

const reactionTargetEventId = isSetReaction?.event_id;
const reactionKey = isSetReaction?.key;

const [userPart, domain] = data.sender.split(':');
if (!userPart || !domain) {
logger.error('Invalid Matrix sender ID format:', data.sender);
return;
}

const user = await Users.findOneByUsername(data.sender);
if (!user) {
logger.error(`No RC user mapping found for Matrix event ${reactionTargetEventId} ${data.sender}`);
return;
}

if (!isSetReaction) {
logger.debug(`No relates_to content in reaction event`);
return;
}

const rcMessage = await Messages.findOneByFederationId(reactionTargetEventId);
if (!rcMessage) {
logger.debug(`No RC message mapping found for Matrix event ${reactionTargetEventId}`);
return;
}

const reactionEmoji = emojione.toShort(reactionKey);
await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, true);
await Messages.setFederationReactionEventId(data.sender, rcMessage._id, reactionEmoji, data.event_id);
} catch (error) {
logger.error('Failed to process Matrix reaction:', error);
}
});

emitter.on('homeserver.matrix.redaction', async (data) => {
try {
const redactedEventId = data.redacts;
if (!redactedEventId) {
logger.debug('No redacts field in redaction event');
return;
}

const reactionEvent = await FederationMatrix.getEventById(redactedEventId);
if (!reactionEvent || reactionEvent.type !== 'm.reaction') {
logger.debug(`Event ${redactedEventId} is not a reaction event`);
return;
}

const reactionContent = reactionEvent.content?.['m.relates_to'];
if (!reactionContent) {
logger.debug('No relates_to content in reaction event');
return;
}

const targetMessageEventId = reactionContent.event_id;
const reactionKey = reactionContent.key;

const rcMessage = await Messages.findOneByFederationId(targetMessageEventId);
if (!rcMessage) {
logger.debug(`No RC message found for event ${targetMessageEventId}`);
return;
}

const user = await Users.findOneByUsername(data.sender);
if (!user) {
logger.debug(`User not found: ${data.sender}`);
return;
}

const reactionEmoji = emojione.toShort(reactionKey);
await Message.reactToMessage(user._id, reactionEmoji, rcMessage._id, false);
await Messages.unsetFederationReactionEventId(redactedEventId, rcMessage._id, reactionEmoji);
} catch (error) {
logger.error('Failed to process Matrix reaction redaction:', error);
}
});
}
18 changes: 18 additions & 0 deletions ee/packages/federation-matrix/src/types/ICallbacks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { IMessage, IUser } from '@rocket.chat/core-typings';

export interface ICallbackPriority {
HIGH: number;
MEDIUM: number;
LOW: number;
}

export interface ICallbacks {
priority: ICallbackPriority;
add(hook: string, callback: (...args: any[]) => any, priority?: number, id?: string): void;
remove(hook: string, id: string): void;
}

export interface IFederationCallbackHandlers {
afterSetReaction?: (message: IMessage, params: { user: IUser; reaction: string }) => Promise<void>;
afterUnsetReaction?: (message: IMessage, params: { user: IUser; reaction: string; oldMessage: IMessage }) => Promise<void>;
}
Loading
Loading