Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6b19a03
refactor: move dot-env dep to the callers (out of fed-sdk) (#97)
Jul 29, 2025
00c5bf0
commit before sleeping :p
debdutdeb Jun 25, 2025
a09e926
commit everything to not lose
debdutdeb Jun 27, 2025
c1bd54b
with invites
debdutdeb Jul 2, 2025
52f8068
ci
debdutdeb Jul 2, 2025
12d6833
ci 2
debdutdeb Jul 2, 2025
8c54b57
more small
debdutdeb Jul 2, 2025
b448a7e
ci 3
debdutdeb Jul 2, 2025
988d601
ci: do not lint js or generated files
Jul 2, 2025
aed9037
ci: no emit files during build
ggazzo Jul 2, 2025
be236af
better typings
debdutdeb Jul 3, 2025
93bd701
migrate test cases
debdutdeb Jul 3, 2025
1061b89
Revert "conflict -n"
debdutdeb Jul 4, 2025
8c73270
Revert "Revert "conflict -n""
debdutdeb Jul 4, 2025
dcd8d57
maybe
debdutdeb Jul 4, 2025
f765d55
this is why i don't like magic
debdutdeb Jul 7, 2025
b143681
chore: better zod typings and misc fixes on PersistentEventBase wrapp…
debdutdeb Jul 10, 2025
b059e1c
chore!: remove v1 and v2 room version support (#76)
debdutdeb Jul 10, 2025
5830c7d
less diff
debdutdeb Jul 17, 2025
b07d1a7
reset lint changes
debdutdeb Jul 17, 2025
0bbf9c9
no generator use
debdutdeb Jul 17, 2025
45eb0e4
lost changes
debdutdeb Jul 17, 2025
8f9d099
forgotten
debdutdeb Jul 17, 2025
fb1c4f2
...
debdutdeb Jul 21, 2025
c3cecec
send stripped state too
debdutdeb Jul 24, 2025
2985649
send message yeah (#94)
debdutdeb Jul 28, 2025
5f7360e
feat: adds set and unset reactions events
ricardogarim Jul 15, 2025
e68f220
chore: fixes biome format
ricardogarim Jul 15, 2025
6c3601f
fix: general fixes where the _id references were wrong
Jul 23, 2025
88c3770
fix: prev_events should now look to eventId instead of _id
ricardogarim Jul 24, 2025
fd588e8
feat: makes set and unset reactions compliant with new state approach
ricardogarim Jul 28, 2025
1d1019f
x
ricardogarim Jul 28, 2025
0a4452f
reverts files
ricardogarim Jul 28, 2025
5d21af3
removes eventId mentions by _id
ricardogarim Jul 28, 2025
bb9e533
reverts files
ricardogarim Jul 28, 2025
90b747f
reverts files
ricardogarim Jul 28, 2025
c0b898e
reverts files
ricardogarim Jul 28, 2025
e8eb0a9
reverts files
ricardogarim Jul 28, 2025
2849db1
reverts files
ricardogarim Jul 28, 2025
ae984f5
reverts files
ricardogarim Jul 28, 2025
b5c65c3
adds reaction event publishing via emitter
ricardogarim Jul 29, 2025
764b359
chore: simplifies configs attrs and keep just one config service
ricardogarim Jul 30, 2025
5edd6d3
chore: adjusts unseting reactions from rc
ricardogarim Jul 30, 2025
54b15e3
fix: fixes queue processing
ricardogarim Jul 31, 2025
6cb6b07
chore: makes persistTimelineEvent sendEventToAllServersInRoom void calls
ricardogarim Jul 31, 2025
6ff46e3
Merge branch 'main' into feat/emitter-reactions
debdutdeb Jul 31, 2025
57f787b
makes persistTimelineEvent to be awaited
ricardogarim Jul 31, 2025
b7b3e2f
Merge branch 'main' into feat/emitter-reactions
debdutdeb Jul 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
MONGODB_URI=
DATABASE_NAME=
DATABASE_POOL_SIZE=

SERVER_NAME=
SERVER_VERSION=
SERVER_PORT=
SERVER_BASE_URL=
SERVER_HOST=

MATRIX_SERVER_NAME=
MATRIX_DOMAIN=
MATRIX_KEY_REFRESH_INTERVAL=

CONFIG_FOLDER=

HOMESERVER_CONFIG_DNS_SERVERS=

LOG_LEVEL=
NODE_ENV=

DEBUG=

NODE_TLS_REJECT_UNAUTHORIZED=
23 changes: 23 additions & 0 deletions packages/federation-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,29 @@ export type HomeserverEventSignatures = {
membership: Membership;
};
};
'homeserver.matrix.reaction': {
event_id: string;
room_id: string;
sender: string;
origin_server_ts: number;
content: {
'm.relates_to': {
rel_type: 'm.annotation';
event_id: string;
key: string;
};
};
};
'homeserver.matrix.redaction': {
event_id: string;
room_id: string;
sender: string;
origin_server_ts: number;
redacts: string;
content: {
reason?: string;
};
};
};

export function getAllServices(): HomeserverServices {
Expand Down
23 changes: 19 additions & 4 deletions packages/federation-sdk/src/queues/staging-area.queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@ type QueueHandler = (item: StagingAreaEventType) => Promise<void>;
@singleton()
export class StagingAreaQueue {
private queue: StagingAreaEventType[] = [];
private priorityQueue: StagingAreaEventType[] = [];
private handlers: QueueHandler[] = [];
private processing = false;

enqueue(item: StagingAreaEventType): void {
this.queue.push(item);
// If this is a continuation of processing (has metadata.state), add to priority queue
if (
item.metadata?.state &&
item.metadata.state !== 'pending_dependencies'
) {
this.priorityQueue.push(item);
} else {
this.queue.push(item);
}
this.processQueue();
}

Expand All @@ -32,15 +41,16 @@ export class StagingAreaQueue {
}

private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) {
if (this.processing) {
return;
}

this.processing = true;

try {
while (this.queue.length > 0) {
const item = this.queue.shift();
while (this.priorityQueue.length > 0 || this.queue.length > 0) {
// Process priority queue first (events in mid-processing)
const item = this.priorityQueue.shift() || this.queue.shift();
if (!item) continue;

for (const handler of this.handlers) {
Expand All @@ -49,6 +59,11 @@ export class StagingAreaQueue {
}
} finally {
this.processing = false;

// Check if new items were added while processing
if (this.priorityQueue.length > 0 || this.queue.length > 0) {
this.processQueue();
}
}
}
}
1 change: 0 additions & 1 deletion packages/federation-sdk/src/services/config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export const AppConfigSchema = z.object({

export class ConfigService {
private config: AppConfig;
private fileConfig: Partial<AppConfig> = {};
private logger = createLogger('ConfigService');

constructor(values: AppConfig) {
Expand Down
11 changes: 9 additions & 2 deletions packages/federation-sdk/src/services/federation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,22 @@ export class FederationService {
}

const txn: Transaction = {
origin: 'rc1.tunnel.dev.rocket.chat', //this.configService.serverName,
origin: this.configService.serverName,
origin_server_ts: Date.now(),
pdus: [event.event],
edus: [],
};

this.logger.info(`Sending event ${event.eventId} to server: ${server}`);

void this.sendTransaction(server, txn);
try {
await this.sendTransaction(server, txn);
} catch (error) {
this.logger.error(
`Failed to send event ${event.eventId} to server: ${server}`,
error,
);
}
}
}
}
106 changes: 49 additions & 57 deletions packages/federation-sdk/src/services/message.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
import {
type ReactionAuthEvents,
type ReactionEvent,
reactionEvent,
} from '@hs/core';
import {
type MessageAuthEvents,
type RoomMessageEvent,
Expand All @@ -17,16 +12,20 @@ import {
redactionEvent,
} from '@hs/core';
import { createLogger } from '@hs/core';
import { generateId } from '@hs/core';
import { signEvent } from '@hs/core';
import {
type PersistentEventBase,
PersistentEventFactory,
type RoomVersion,
} from '@hs/room';
import { inject } from 'tsyringe';
import { singleton } from 'tsyringe';
import type { EventRepository } from '../repositories/event.repository';
import type { ConfigService } from './config.service';
import { EventService, EventType } from './event.service';
import { FederationService } from './federation.service';
import type { RoomService } from './room.service';
import { StateService } from './state.service';
import { PersistentEventBase, PersistentEventFactory } from '@hs/room';
import type { StateService } from './state.service';

@singleton()
export class MessageService {
Expand All @@ -39,6 +38,8 @@ export class MessageService {
private readonly federationService: FederationService,
@inject('RoomService') private readonly roomService: RoomService,
@inject('StateService') private readonly stateService: StateService,
@inject('EventRepository')
private readonly eventRepository: EventRepository,
) {}

async sendMessage(
Expand Down Expand Up @@ -82,76 +83,67 @@ export class MessageService {
eventId: string,
emoji: string,
senderUserId: string,
targetServer: string,
): Promise<SignedEvent<ReactionEvent>> {
): Promise<string> {
const isTombstoned = await this.roomService.isRoomTombstoned(roomId);
if (isTombstoned) {
this.logger.warn(
`Attempted to send message to a tombstoned room: ${roomId}`,
`Attempted to react to a message in a tombstoned room: ${roomId}`,
);
throw new ForbiddenError(
'Cannot react to a message in a tombstoned room',
);
}

const serverName = this.configService.getServerConfig().name;
const signingKey = await this.configService.getSigningKey();
const roomInfo = await this.stateService.getRoomInformation(roomId);

const latestEventDoc = await this.eventService.getLastEventForRoom(roomId);
const prevEvents = latestEventDoc ? [latestEventDoc._id] : [];

const authEvents = await this.eventService.getAuthEventIds(
EventType.REACTION,
{ roomId, senderId: senderUserId },
const reactionEvent = PersistentEventFactory.newReactionEvent(
roomId,
senderUserId,
eventId,
emoji,
roomInfo.room_version as RoomVersion,
);

const currentDepth = latestEventDoc?.event?.depth ?? 0;
const newDepth = currentDepth + 1;
await this.stateService.addAuthEvents(reactionEvent);

const authEventsMap: ReactionAuthEvents = {
'm.room.create':
authEvents.find((event) => event.type === EventType.CREATE)?._id || '',
'm.room.power_levels':
authEvents.find((event) => event.type === EventType.POWER_LEVELS)
?._id || '',
'm.room.member':
authEvents.find((event) => event.type === EventType.MEMBER)?._id || '',
};
await this.stateService.addPrevEvents(reactionEvent);

const { state_key, ...eventForSigning } = reactionEvent({
roomId,
sender: senderUserId,
auth_events: authEventsMap,
prev_events: prevEvents,
depth: newDepth,
content: {
'm.relates_to': {
rel_type: 'm.annotation',
event_id: eventId,
key: emoji,
},
},
origin: serverName,
ts: Date.now(),
});
await this.stateService.signEvent(reactionEvent);

const signedEvent = await signEvent(
eventForSigning,
Array.isArray(signingKey) ? signingKey[0] : signingKey,
serverName,
);
await this.stateService.persistTimelineEvent(reactionEvent);

this.logger.debug(signedEvent);
void this.federationService.sendEventToAllServersInRoom(reactionEvent);

await this.federationService.sendEvent(targetServer, signedEvent);
return reactionEvent.eventId;
}

await this.eventService.insertEvent(signedEvent, eventId);
async unsetReaction(
roomId: string,
eventIdReactedTo: string,
_emoji: string,
senderUserId: string,
): Promise<string> {
const roomInfo = await this.stateService.getRoomInformation(roomId);

this.logger.info(
`Sent reaction $emojito $targetServerfor event $eventId- $generateId(${signedEvent})`,
const redactionEvent = PersistentEventFactory.newRedactionEvent(
roomId,
senderUserId,
eventIdReactedTo,
'Unsetting reaction',
roomInfo.room_version as RoomVersion,
);

return signedEvent;
await this.stateService.addAuthEvents(redactionEvent);

await this.stateService.addPrevEvents(redactionEvent);

await this.stateService.signEvent(redactionEvent);

await this.stateService.persistTimelineEvent(redactionEvent);

void this.federationService.sendEventToAllServersInRoom(redactionEvent);

return redactionEvent.eventId;
}

async updateMessage(
Expand Down
52 changes: 36 additions & 16 deletions packages/federation-sdk/src/services/staging-area.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import type { StagingAreaEventType } from '../queues/staging-area.queue';
import { StagingAreaQueue } from '../queues/staging-area.queue';

import { createLogger } from '@hs/core';
import { Pdu, PersistentEventFactory } from '@hs/room';
import { Lock } from '../utils/lock.decorator';
import { EventAuthorizationService } from './event-authorization.service';
import { EventEmitterService } from './event-emitter.service';
import { EventStateService } from './event-state.service';
import { EventService } from './event.service';
import { EventType } from './event.service';
import { MissingEventService } from './missing-event.service';
import { EventEmitterService } from './event-emitter.service';
import { Pdu, PersistentEventFactory } from '@hs/room';
import { StateService } from './state.service';

// ProcessingState indicates where in the flow an event is
Expand Down Expand Up @@ -47,9 +47,7 @@ export class StagingAreaService {
private readonly eventStateService: EventStateService,
private readonly eventEmitterService: EventEmitterService,
private readonly stateService: StateService,
) {
this.processQueue();
}
) {}

addEventToQueue(event: StagingAreaEventType) {
const extendedEvent: ExtendedStagingEvent = {
Expand All @@ -70,22 +68,13 @@ export class StagingAreaService {
this.logger.debug(`Added event ${event.eventId} to processing queue`);
}

private async processQueue() {
setInterval(async () => {
const event = this.stagingAreaQueue.dequeue();
if (event) {
await this.processEvent(event);
}
}, 100);
}

extractEventsFromIncomingPDU(pdu: StagingAreaEventType) {
const authEvents = pdu.event.auth_events || [];
const prevEvents = pdu.event.prev_events || [];
return [...authEvents, ...prevEvents];
}

@Lock({ timeout: 10000, keyPath: 'event.room_id' })
// @Lock({ timeout: 10000, keyPath: 'event.room_id' })
async processEvent(event: StagingAreaEventType & { metadata?: any }) {
const eventId = event.eventId;
const trackedEvent = this.processingEvents.get(eventId);
Expand Down Expand Up @@ -347,7 +336,9 @@ export class StagingAreaService {
private async processNotificationStage(event: StagingAreaEventType) {
const eventId = event.eventId;
const trackedEvent = this.processingEvents.get(eventId);
if (!trackedEvent) return;
if (!trackedEvent) {
return;
}

try {
this.logger.debug(`Notifying clients about event ${eventId}`);
Expand All @@ -365,6 +356,35 @@ export class StagingAreaService {
},
});
break;
case EventType.REACTION: {
this.eventEmitterService.emit('homeserver.matrix.reaction', {
event_id: event.eventId,
room_id: event.roomId,
sender: event.event.sender,
origin_server_ts: event.event.origin_server_ts,
content: event.event.content as {
'm.relates_to': {
rel_type: 'm.annotation';
event_id: string;
key: string;
};
},
});
break;
}
case EventType.REDACTION: {
this.eventEmitterService.emit('homeserver.matrix.redaction', {
event_id: event.eventId,
room_id: event.roomId,
sender: event.event.sender,
origin_server_ts: event.event.origin_server_ts,
redacts: (event.event as any).redacts,
content: {
reason: event.event.content?.reason as string | undefined,
},
});
break;
}
default:
this.logger.warn(
`Unknown event type: ${event.event.type} for emitterService for now`,
Expand Down
Loading