Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions apps/meteor/server/services/upload/service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import fs from 'fs';
import type Stream from 'stream';

import type { IUploadDetails } from '@rocket.chat/apps-engine/definition/uploads/IUploadDetails';
import { ServiceClassInternal } from '@rocket.chat/core-services';
import type { ISendFileLivechatMessageParams, ISendFileMessageParams, IUploadFileParams, IUploadService } from '@rocket.chat/core-services';
import type { IUpload, IUser, FilesAndAttachments, IMessage } from '@rocket.chat/core-typings';
import { isFileAttachment } from '@rocket.chat/core-typings';
import { Logger } from '@rocket.chat/logger';
import { Uploads } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';
import sharp from 'sharp';

import { canAccessRoomIdAsync } from '../../../app/authorization/server/functions/canAccessRoom';
import { canDeleteMessageAsync } from '../../../app/authorization/server/functions/canDeleteMessage';
Expand All @@ -13,6 +19,7 @@ import { updateMessage } from '../../../app/lib/server/functions/updateMessage';
import { sendFileLivechatMessage } from '../../../app/livechat/server/methods/sendFileLivechatMessage';
import { NOTIFICATION_ATTACHMENT_COLOR } from '../../../lib/constants';
import { i18n } from '../../lib/i18n';
import { UploadFS } from '../../ufs';

const logger = new Logger('UploadService');

Expand Down Expand Up @@ -133,4 +140,71 @@ export class UploadService extends ServiceClassInternal implements IUploadServic

await updateMessage(editedMessage, user, msg);
}

async streamUploadedFile({
file,
imageResizeOpts,
}: {
file: IUpload;
imageResizeOpts?: { width: number; height: number };
}): Promise<Stream.Readable> {
const stream = await FileUpload.getStore('Uploads')._store.getReadStream(file._id, file);
if (!stream) {
throw new Error('error-file-not-found');
}

if (file?.type?.includes('image') && imageResizeOpts) {
const { width, height } = imageResizeOpts;

const transformer = sharp().resize({ width, height, fit: 'contain' });

stream.on('error', (err) => transformer.destroy(err));

return stream.pipe(transformer);
}

return stream;
}

async uploadFileFromStream({
streamParam,
details,
}: {
streamParam: Stream.Readable;
details: Omit<IUploadDetails, 'size'>;
}): Promise<IUpload> {
const resolver = Promise.withResolvers<IUpload>();

const tempFilePath = UploadFS.getTempFilePath(Random.id());

const writeStream = fs.createWriteStream(tempFilePath);
streamParam.pipe(writeStream);

const cleanup = (err: unknown) => {
fs.promises.unlink(tempFilePath).catch(() => undefined);
resolver.reject(err);
};

writeStream.on('finish', async () => {
FileUpload.getStore('Uploads')
.insert(
{
...details,
size: writeStream.bytesWritten,
},
tempFilePath,
)
.then(resolver.resolve)
.catch(cleanup);
});

streamParam.on('error', async (err) => {
writeStream.destroy();
cleanup(err);
});

writeStream.on('error', cleanup);

return resolver.promise;
}
Comment on lines +169 to +209
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Sharing a single Readable across multiple consumers will fail.

This method is called from OmnichannelTranscript.uploadFiles (line 420-432 of OmnichannelTranscript.ts), which maps over roomIds and calls uploadFileFromStream for each room using the same streamParam. A Readable stream can only be consumed once — the first .pipe() will drain it, and subsequent calls will receive an already-ended (or empty) stream.

In OmnichannelTranscript.ts lines 370-375, a single Readable.from(stream) is created and passed to uploadFiles, which then fans it out via Promise.all to multiple rooms. The second upload will get no data.

🐛 Possible approaches

Option A: Buffer the stream once, then upload from the buffer to each room:

 private async uploadFiles({
-    streamParam,
+    buffer,
     roomIds,
     ...
 }) {
     return Promise.all(
         roomIds.map((roomId) => {
-            return uploadService.uploadFileFromStream({ streamParam, details: { ... } });
+            return uploadService.uploadFile({ buffer, details: { ..., size: buffer.length } });
         }),
     );
 }

Option B: Clone/tee the stream for each consumer (e.g., using stream.PassThrough or buffering once then wrapping with Readable.from()).

Option C: Upload once, then copy the upload record for the second room.

🤖 Prompt for AI Agents
In `@apps/meteor/server/services/upload/service.ts` around lines 169 - 209, The
current uploadFileFromStream consumes the provided Readable (streamParam) once,
so when OmnichannelTranscript.uploadFiles fans the same stream to multiple calls
it fails; change the implementation so the stream is buffered once and then
re-used for each consumer: e.g., in uploadFileFromStream (or upstream in
OmnichannelTranscript.uploadFiles) read the incoming stream to a temporary
buffer/file first, then for each target room create a fresh Readable from that
buffer (or call FileUpload.getStore('Uploads').insert multiple times using the
same temp file path) instead of piping the original stream multiple times;
update references to streamParam, tempFilePath, and the write/cleanup logic to
ensure the temp file is cleaned after all inserts complete or on error.

}
6 changes: 3 additions & 3 deletions ee/packages/federation-matrix/src/events/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ async function handleMediaMessage(

const fileRefId = await MatrixMediaService.downloadAndStoreRemoteFile(url, matrixRoomId, {
name: messageBody,
size: fileInfo?.size,
type: mimeType,
roomId: room._id,
size: fileInfo?.size || 0,
type: mimeType || 'application/octet-stream',
rid: room._id,
userId: user._id,
});

Expand Down
19 changes: 3 additions & 16 deletions ee/packages/federation-matrix/src/services/MatrixMediaService.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { IUploadDetails } from '@rocket.chat/apps-engine/definition/uploads/IUploadDetails';
import { Upload } from '@rocket.chat/core-services';
import type { IUpload } from '@rocket.chat/core-typings';
import { federationSDK } from '@rocket.chat/federation-sdk';
Expand Down Expand Up @@ -79,18 +80,7 @@ export class MatrixMediaService {
}
}

static async downloadAndStoreRemoteFile(
mxcUri: string,
matrixRoomId: string,
metadata: {
name: string;
size?: number;
type?: string;
messageId?: string;
roomId?: string;
userId?: string;
},
): Promise<string> {
static async downloadAndStoreRemoteFile(mxcUri: string, matrixRoomId: string, metadata: IUploadDetails): Promise<string> {
try {
const parts = this.parseMXCUri(mxcUri);
if (!parts) {
Expand All @@ -113,11 +103,8 @@ export class MatrixMediaService {
userId: metadata.userId || 'federation',
buffer,
details: {
name: metadata.name || 'unnamed',
...metadata,
size: buffer.length,
type: metadata.type || 'application/octet-stream',
rid: metadata.roomId,
userId: metadata.userId || 'federation',
},
});

Expand Down
22 changes: 17 additions & 5 deletions ee/packages/network-broker/src/NetworkBroker.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Stream from 'stream';

import { asyncLocalStorage } from '@rocket.chat/core-services';
import type { CallingOptions, IBroker, IBrokerNode, IServiceMetrics, IServiceClass, EventSignatures } from '@rocket.chat/core-services';
import { injectCurrentContext, tracerSpan } from '@rocket.chat/tracing';
Expand Down Expand Up @@ -39,8 +41,14 @@ export class NetworkBroker implements IBroker {

const context = asyncLocalStorage.getStore();

const stream = data?.[0]?.streamParam;
const isStreamingCall = !!stream;

if (context?.ctx?.call) {
return context.ctx.call(method, data, options);
return context.ctx.call(method, isStreamingCall ? stream : data, {
...options,
...(isStreamingCall && { meta: { ...((options as any)?.meta || {}), details: data[0].details } }),
});
}

const services: { name: string }[] = await this.broker.call('$node.services', {
Expand All @@ -51,10 +59,11 @@ export class NetworkBroker implements IBroker {
return new Error('method-not-available');
}

return this.broker.call(method, data, {
return this.broker.call(method, isStreamingCall ? stream : data, {
...options,
meta: {
optl: injectCurrentContext(),
...(isStreamingCall && { details: data?.[0]?.details }),
},
});
}
Expand Down Expand Up @@ -134,9 +143,12 @@ export class NetworkBroker implements IBroker {
continue;
}

service.actions[method] = async (ctx: Context<[], { optl?: unknown }>): Promise<any> => {
service.actions[method] = async (ctx: Context<[], { optl?: unknown; details?: unknown }>): Promise<any> => {
const isStreamingCall = Stream.isReadable(ctx.params as any);
const params = isStreamingCall ? [{ streamParam: ctx.params, details: ctx.meta?.details }] : ctx.params;

return tracerSpan(
`action ${name}:${method}`,
`action ${name}:${method}${isStreamingCall ? ' (streaming)' : ''}`,
{},
() => {
return asyncLocalStorage.run(
Expand All @@ -147,7 +159,7 @@ export class NetworkBroker implements IBroker {
broker: this,
ctx,
},
() => serviceInstance[method](...ctx.params),
() => serviceInstance[method](...params),
);
},
ctx.meta?.optl,
Expand Down
23 changes: 9 additions & 14 deletions ee/packages/omnichannel-services/src/OmnichannelTranscript.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Readable } from 'stream';
import { Readable } from 'stream';

import {
ServiceClass,
Expand Down Expand Up @@ -241,8 +241,9 @@ export class OmnichannelTranscript extends ServiceClass implements IOmnichannelT
}

try {
const fileBuffer = await uploadService.getFileBuffer({ file: uploadedFile });
files.push({ name: file.name, buffer: fileBuffer, extension: uploadedFile.extension });
const stream = await uploadService.streamUploadedFile({ file: uploadedFile, imageResizeOpts: { width: 400, height: 240 } });

files.push({ name: file.name, buffer: await streamToBuffer(stream), extension: uploadedFile.extension });
} catch (err: unknown) {
this.log.error({ msg: 'Failed to fetch file buffer', err });
// Push empty buffer so parser processes this as "unsupported file"
Expand Down Expand Up @@ -363,13 +364,11 @@ export class OmnichannelTranscript extends ServiceClass implements IOmnichannelT
const transcriptText = i18n.t('Transcript');

const stream = await this.worker.renderToStream({ data, i18n });
const outBuff = await streamToBuffer(stream as Readable);

try {
const { rid } = await roomService.createDirectMessage({ to: details.userId, from: 'rocket.cat' });
const [rocketCatFile, transcriptFile] = await this.uploadFiles({
details,
buffer: outBuff,
streamParam: Readable.from(stream),
roomIds: [rid, details.rid],
data,
transcriptText,
Expand Down Expand Up @@ -406,23 +405,20 @@ export class OmnichannelTranscript extends ServiceClass implements IOmnichannelT
}

private async uploadFiles({
details,
buffer,
streamParam,
roomIds,
data,
transcriptText,
}: {
details: WorkDetailsWithSource;
buffer: Buffer;
streamParam: Readable;
roomIds: string[];
data: Pick<WorkerData, 'siteName' | 'visitor'>;
transcriptText: string;
}): Promise<IUpload[]> {
return Promise.all(
roomIds.map((roomId) => {
return uploadService.uploadFile({
userId: details.userId,
buffer,
return uploadService.uploadFileFromStream({
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The usage of uploadFileFromStream (which relies on .pipe()) risks causing the process to hang if the PDF generation stream emits an error. Standard .pipe() does not forward errors to the destination, and uploadFileFromStream does not appear to handle source stream errors. If the source stream fails, the upload promise will likely never resolve.

Ensure uploadFileFromStream handles the source stream's 'error' event (e.g., using stream.pipeline or explicit listeners) to reject the promise appropriately.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At ee/packages/omnichannel-services/src/OmnichannelTranscript.ts, line 420:

<comment>The usage of `uploadFileFromStream` (which relies on `.pipe()`) risks causing the process to hang if the PDF generation stream emits an error. Standard `.pipe()` does not forward errors to the destination, and `uploadFileFromStream` does not appear to handle source stream errors. If the source stream fails, the upload promise will likely never resolve.

Ensure `uploadFileFromStream` handles the source stream's 'error' event (e.g., using `stream.pipeline` or explicit listeners) to reject the promise appropriately.</comment>

<file context>
@@ -406,23 +405,20 @@ export class OmnichannelTranscript extends ServiceClass implements IOmnichannelT
-				return uploadService.uploadFile({
-					userId: details.userId,
-					buffer,
+				return uploadService.uploadFileFromStream({
+					streamParam,
 					details: {
</file context>
Fix with Cubic

streamParam,
details: {
// transcript_{company-name}_{date}_{hour}.pdf
name: `${transcriptText}_${data.siteName}_${new Intl.DateTimeFormat('en-US').format(new Date()).replace(/\//g, '-')}_${
Expand All @@ -432,7 +428,6 @@ export class OmnichannelTranscript extends ServiceClass implements IOmnichannelT
rid: roomId,
// Rocket.cat is the goat
userId: 'rocket.cat',
size: buffer.length,
},
});
}),
Expand Down
12 changes: 11 additions & 1 deletion packages/core-services/src/types/IUploadService.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type Stream from 'stream';

import type { IUploadDetails } from '@rocket.chat/apps-engine/definition/uploads/IUploadDetails';
import type { IMessage, IUpload, IUser, FilesAndAttachments } from '@rocket.chat/core-typings';

export interface IUploadFileParams {
userId: string;
buffer: Buffer;
details: Partial<IUploadDetails>;
details: IUploadDetails;
}
export interface ISendFileMessageParams {
roomId: string;
Expand All @@ -29,4 +31,12 @@ export interface IUploadService {
parseFileIntoMessageAttachments(file: Partial<IUpload>, roomId: string, user: IUser): Promise<FilesAndAttachments>;
canDeleteFile(user: IUser, file: IUpload, msg: IMessage | null): Promise<boolean>;
deleteFile(user: IUser, fileId: IUpload['_id'], msg: IMessage | null): Promise<{ deletedFiles: IUpload['_id'][] }>;
streamUploadedFile({
file,
imageResizeOpts,
}: {
file: IUpload;
imageResizeOpts?: { width: number; height: number };
}): Promise<Stream.Readable>;
uploadFileFromStream({ streamParam, details }: { streamParam: Stream.Readable; details: Omit<IUploadDetails, 'size'> }): Promise<IUpload>;
}
Loading