Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 21 additions & 52 deletions packages/federation-sdk/src/services/message.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import type { ConfigService } from './config.service';
import { EventService, EventType } from './event.service';
import { FederationService } from './federation.service';
import type { RoomService } from './room.service';
import { StateService } from './state.service';
import { PersistentEventBase, PersistentEventFactory } from '@hs/room';

@singleton()
export class MessageService {
Expand All @@ -36,73 +38,40 @@ export class MessageService {
@inject('FederationService')
private readonly federationService: FederationService,
@inject('RoomService') private readonly roomService: RoomService,
@inject('StateService') private readonly stateService: StateService,
) {}

async sendMessage(
roomId: string,
message: string,
senderUserId: string,
targetServer: string,
): Promise<SignedEvent<RoomMessageEvent>> {
const isTombstoned = await this.roomService.isRoomTombstoned(roomId);
if (isTombstoned) {
this.logger.warn(
`Attempted to react to a message in a tombstoned room: ${roomId}`,
): Promise<PersistentEventBase> {
const roomVersion = await this.stateService.getRoomVersion(roomId);
if (!roomVersion) {
throw new Error(
`Room version not found for room ${roomId} white trying to send message`,
);
throw new ForbiddenError('Cannot send message to a tombstoned room');
}
const serverName = this.configService.getServerConfig().name;
const signingKey = await this.configService.getSigningKey();

const authEvents = await this.eventService.getAuthEventIds(
EventType.MESSAGE,
{ roomId, senderId: senderUserId },
const event = PersistentEventFactory.newMessageEvent(
roomId,
senderUserId,
message,
roomVersion,
);

const latestEventDoc = await this.eventService.getLastEventForRoom(roomId);
const prevEvents = latestEventDoc ? [latestEventDoc._id] : [];
await Promise.all([
this.stateService.addAuthEvents(event),
this.stateService.addPrevEvents(event),
]);

const currentDepth = latestEventDoc?.event?.depth ?? 0;
const newDepth = currentDepth + 1;
await this.stateService.signEvent(event);

const authEventsMap: MessageAuthEvents = {
'm.room.create':
authEvents.find((event) => event.type === EventType.CREATE)?._id || '',
'm.room.power_levels':
authEvents.find((event) => event.type === EventType.POWER_LEVELS)
?._id || '',
'm.room.member':
authEvents.find((event) => event.type === EventType.MEMBER)?._id || '',
};
await this.stateService.saveMessage(event);

const { state_key, ...eventForSigning } = roomMessageEvent({
roomId,
sender: senderUserId,
auth_events: authEventsMap,
prev_events: prevEvents,
depth: newDepth,
content: {
msgtype: 'm.text',
body: message,
'm.mentions': {},
},
origin: serverName,
ts: Date.now(),
});

const signedEvent = await signEvent(
eventForSigning,
Array.isArray(signingKey) ? signingKey[0] : signingKey,
serverName,
);

const eventId = generateId(signedEvent);
await this.federationService.sendEvent(targetServer, signedEvent);
await this.eventService.insertEvent(signedEvent, eventId);

this.logger.info(`Sent message to ${targetServer} - ${eventId}`);
void this.federationService.sendEventToAllServersInRoom(event);

return { ...signedEvent, event_id: eventId };
return event;
}

async sendReaction(
Expand Down
66 changes: 59 additions & 7 deletions packages/federation-sdk/src/services/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import { checkEventAuthWithState } from '@hs/room';
type State = Map<StateMapKey, PersistentEventBase>;

type StrippedRoomState = {
content: PduContent,
sender: string,
state_key: string,
type: PduType
content: PduContent;
sender: string;
state_key: string;
type: PduType;
};

@singleton()
Expand Down Expand Up @@ -214,9 +214,10 @@ export class StateService {

return finalState;
}


public async getStrippedRoomState(roomId: string): Promise<StrippedRoomState[]> {

public async getStrippedRoomState(
roomId: string,
): Promise<StrippedRoomState[]> {
const state = await this.getFullRoomState(roomId);

const strippedState: StrippedRoomState[] = [];
Expand Down Expand Up @@ -551,6 +552,57 @@ export class StateService {
return stateMappings.map((stateMapping) => stateMapping.roomId).toArray();
}

async saveMessage(event: PersistentEventBase) {
const room = await this.getFullRoomState(event.roomId);

const roomVersion = room
.get('m.room.create:')
?.getContent<PduCreateEventContent>().room_version as RoomVersion;

const requiredAuthEventsWeHaveSeen = new Map<string, PersistentEventBase>();
for (const auth of event.getAuthEventStateKeys()) {
const authEvent = room.get(auth);
if (authEvent) {
requiredAuthEventsWeHaveSeen.set(authEvent.eventId, authEvent);
}
}

// auth events referenced in the message
const store = this._getStore(roomVersion);
const authEventsReferencedInMessage = await store.getEvents(
event.event.auth_events as string[],
);
const authEventsReferenced = new Map<string, PersistentEventBase>();
for (const authEvent of authEventsReferencedInMessage) {
authEventsReferenced.set(authEvent.eventId, authEvent);
}

// both auth events set must match
if (requiredAuthEventsWeHaveSeen.size !== authEventsReferenced.size) {
throw new Error('Auth events referenced in message do not match');
}

for (const [eventId] of requiredAuthEventsWeHaveSeen) {
if (!authEventsReferenced.has(eventId)) {
throw new Error('wrong auth event in message');
}
}

// now we validate against auth rules
await checkEventAuthWithState(event, room, store);
if (event.rejected) {
throw new Error(event.rejectedReason);
}

// TODO: save event still but with mark

// now we persist the event
const eventsCollection = await this.eventRepository.getCollection();
await eventsCollection.insertOne(event.event as any);

// transactions not handled here, since we can use this method as part of a "transaction receive"
}

async getAllPublicRoomIdsAndNames() {
const stateCollection = await this.stateRepository.getCollection();

Expand Down
35 changes: 0 additions & 35 deletions packages/homeserver/src/controllers/internal/message.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,6 @@ import {
export const internalMessagePlugin = (app: Elysia) => {
const messageService = container.resolve(MessageService);
return app
.post(
'/internal/messages',
async ({
body,
set,
}): Promise<InternalMessageResponse | ErrorResponse> => {
const { roomId, message, senderUserId, targetServer } = body;
try {
return await messageService.sendMessage(
roomId,
message,
senderUserId,
targetServer,
);
} catch (error) {
set.status = 500;
return {
error: `Failed to send message: ${error instanceof Error ? error.message : String(error)}`,
details: {},
};
}
},
{
body: InternalSendMessageBodyDto,
response: {
200: InternalMessageResponseDto,
500: ErrorResponseDto,
},
detail: {
tags: ['Internal'],
summary: 'Send a message to a room',
description: 'Send a text message to a Matrix room',
},
},
)
.patch(
'/internal/messages/:messageId',
async ({
Expand Down
32 changes: 32 additions & 0 deletions packages/room/src/manager/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
PduPowerLevelsEventContent,
Pdu,
PduTypeRoomCanonicalAlias,
PduTypeRoomMessage,
} from '../types/v3-11';

import { PersistentEventV3 } from './v3';
Expand Down Expand Up @@ -291,4 +292,35 @@ export class PersistentEventFactory {

return PersistentEventFactory.createFromRawEvent(eventPartial, roomVersion);
}

static newMessageEvent(
roomId: string,
sender: string,
text: string,
roomVersion: RoomVersion = PersistentEventFactory.defaultRoomVersion,
) {
if (!PersistentEventFactory.isSupportedRoomVersion(roomVersion)) {
throw new Error(`Room version ${roomVersion} is not supported`);
}

const eventPartial: Omit<
PduForType<typeof PduTypeRoomMessage>,
'signatures' | 'hashes'
> = {
type: PduTypeRoomMessage,
content: {
msgtype: 'm.text' as const,
body: text,
},
sender: sender,
origin: sender.split(':').pop(),
origin_server_ts: Date.now(),
room_id: roomId,
prev_events: [],
auth_events: [],
depth: 0,
};

return PersistentEventFactory.createFromRawEvent(eventPartial, roomVersion);
}
}
17 changes: 17 additions & 0 deletions packages/room/src/types/v3-11.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,16 @@ export type PduRoomNameEventContent = z.infer<
typeof PduRoomNameEventContentSchema
>;

export const PduMessageEventContentSchema = z.object({
body: z.string().describe('The body of the message.'),
// TODO: add more types
msgtype: z.enum(['m.text', 'm.image']).describe('The type of the message.'),
});

export type PduMessageEventContent = z.infer<
typeof PduMessageEventContentSchema
>;

export const PduContentSchema = z
.union([
PduMembershipEventContentSchema,
Expand All @@ -343,6 +353,7 @@ export const PduContentSchema = z
PduPowerLevelsEventContentSchema,
PduCanonicalAliasEventContentSchema,
PduRoomNameEventContentSchema,
PduMessageEventContentSchema,
])
.describe(
'The content of the event. This is an object with arbitrary keys and values.',
Expand Down Expand Up @@ -451,6 +462,12 @@ export function generatePduSchemaForBase<T>(base: T) {
type: z.literal(PduTypeRoomAliases),
content: PduCanonicalAliasEventContentSchema,
}),

z.object({
...base,
type: z.literal(PduTypeRoomMessage),
content: PduMessageEventContentSchema,
}),
]);
}

Expand Down