Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion packages/backend/server/src/core/static-files/static.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ import { isMobileRequest } from '../utils/user-agent';

const staticPathRegex = /^\/(_plugin|assets|imgs|js|plugins|static)\//;

function isMissingStaticAssetError(error: unknown) {
if (!error || typeof error !== 'object') {
return false;
}

const err = error as { code?: string; status?: number; statusCode?: number };

return err.code === 'ENOENT' || err.status === 404 || err.statusCode === 404;
}

@Injectable()
export class StaticFilesResolver implements OnModuleInit {
constructor(
Expand Down Expand Up @@ -86,7 +96,18 @@ export class StaticFilesResolver implements OnModuleInit {
next();
return;
}
routeByUA(req, res, next, true);
routeByUA(
req,
res,
error => {
if (isMissingStaticAssetError(error)) {
res.status(404).end();
return;
}
next(error);
},
true
);
});

// /
Expand Down
92 changes: 79 additions & 13 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ export class SpaceSyncGateway
private readonly server!: Server;

private connectionCount = 0;
private readonly socketUsers = new Map<string, string>();
private readonly localUserConnectionCounts = new Map<string, number>();
private unresolvedPresenceSockets = 0;
private flushTimer?: NodeJS.Timeout;

constructor(
Expand All @@ -224,7 +227,9 @@ export class SpaceSyncGateway
onModuleInit() {
this.flushTimer = setInterval(() => {
this.flushActiveUsersMinute().catch(error => {
this.logger.warn('Failed to flush active users minute', error as Error);
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
}, 60_000);
this.flushTimer.unref?.();
Expand Down Expand Up @@ -278,8 +283,7 @@ export class SpaceSyncGateway
};
} catch (error) {
this.logger.warn(
'Failed to merge updates for broadcast, falling back to batch',
error as Error
`Failed to merge updates for broadcast, falling back to batch: ${this.formatError(error)}`
);
return {
spaceType,
Expand All @@ -302,36 +306,45 @@ export class SpaceSyncGateway
this.connectionCount++;
this.logger.debug(`New connection, total: ${this.connectionCount}`);
metrics.socketio.gauge('connections').record(this.connectionCount);
this.attachPresenceUserId(client);
this.flushActiveUsersMinute().catch(error => {
this.logger.warn('Failed to flush active users minute', error as Error);
const userId = this.attachPresenceUserId(client);
this.trackConnectedSocket(client.id, userId);
void this.flushActiveUsersMinute({
aggregateAcrossCluster: false,
}).catch(error => {
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
}

handleDisconnect(_client: Socket) {
handleDisconnect(client: Socket) {
this.connectionCount = Math.max(0, this.connectionCount - 1);
this.trackDisconnectedSocket(client.id);
this.logger.debug(
`Connection disconnected, total: ${this.connectionCount}`
);
metrics.socketio.gauge('connections').record(this.connectionCount);
void this.flushActiveUsersMinute({
aggregateAcrossCluster: false,
}).catch(error => {
this.logger.warn('Failed to flush active users minute', error as Error);
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
}

private attachPresenceUserId(client: Socket) {
private attachPresenceUserId(client: Socket): string | null {
const request = client.request as Request;
const userId = request.session?.user.id ?? request.token?.user.id;
if (typeof userId !== 'string' || !userId) {
this.logger.warn(
`Unable to resolve authenticated user id for socket ${client.id}`
);
return;
return null;
}

client.data[SOCKET_PRESENCE_USER_ID_KEY] = userId;
return userId;
}

private resolvePresenceUserId(socket: { data?: unknown }) {
Expand All @@ -345,14 +358,68 @@ export class SpaceSyncGateway
return typeof userId === 'string' && userId ? userId : null;
}

private trackConnectedSocket(socketId: string, userId: string | null) {
if (!userId) {
this.unresolvedPresenceSockets++;
return;
}

this.socketUsers.set(socketId, userId);
const prev = this.localUserConnectionCounts.get(userId) ?? 0;
this.localUserConnectionCounts.set(userId, prev + 1);
}

private trackDisconnectedSocket(socketId: string) {
const userId = this.socketUsers.get(socketId);
if (!userId) {
this.unresolvedPresenceSockets = Math.max(
0,
this.unresolvedPresenceSockets - 1
);
return;
}

this.socketUsers.delete(socketId);
const next = (this.localUserConnectionCounts.get(userId) ?? 1) - 1;
if (next <= 0) {
this.localUserConnectionCounts.delete(userId);
} else {
this.localUserConnectionCounts.set(userId, next);
}
}

private resolveLocalActiveUsers() {
if (this.unresolvedPresenceSockets > 0) {
return Math.max(0, this.connectionCount);
}

return this.localUserConnectionCounts.size;
}

private formatError(error: unknown) {
if (error instanceof Error) {
return error.stack ?? error.message;
}

if (typeof error === 'string') {
return error;
}

try {
return JSON.stringify(error);
} catch {
return String(error);
}
}

private async flushActiveUsersMinute(options?: {
aggregateAcrossCluster?: boolean;
}) {
const minute = new Date();
minute.setSeconds(0, 0);

const aggregateAcrossCluster = options?.aggregateAcrossCluster ?? true;
let activeUsers = Math.max(0, this.connectionCount);
let activeUsers = this.resolveLocalActiveUsers();
if (aggregateAcrossCluster) {
try {
const sockets = await this.server.fetchSockets();
Expand All @@ -377,8 +444,7 @@ export class SpaceSyncGateway
}
} catch (error) {
this.logger.warn(
'Failed to aggregate active users from sockets, using local value',
error as Error
`Failed to aggregate active users from sockets, using local value: ${this.formatError(error)}`
);
}
}
Expand Down
Loading