Skip to content

Commit 6f35185

Browse files
Merge pull request #15543 from kim-sung-jee/feature/websockets-manual-ack
feat(websockets): allow manual acknowledgement handling with @ack() decorator
2 parents 91471c9 + 526e68c commit 6f35185

File tree

15 files changed

+218
-13
lines changed

15 files changed

+218
-13
lines changed

integration/websockets/e2e/gateway-ack.spec.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,38 @@ describe('WebSocketGateway (ack)', () => {
4141
);
4242
});
4343

44+
it('should handle manual ack for async operations when @Ack() is used (success case)', async () => {
45+
app = await createNestApp(AckGateway);
46+
await app.listen(3000);
47+
48+
ws = io('http://localhost:8080');
49+
const payload = { shouldSucceed: true };
50+
51+
await new Promise<void>(resolve =>
52+
ws.emit('manual-ack', payload, response => {
53+
expect(response).to.eql({ status: 'success', data: payload });
54+
resolve();
55+
}),
56+
);
57+
});
58+
59+
it('should handle manual ack for async operations when @Ack() is used (error case)', async () => {
60+
app = await createNestApp(AckGateway);
61+
await app.listen(3000);
62+
63+
ws = io('http://localhost:8080');
64+
const payload = { shouldSucceed: false };
65+
66+
await new Promise<void>(resolve =>
67+
ws.emit('manual-ack', payload, response => {
68+
expect(response).to.eql({
69+
status: 'error',
70+
message: 'Operation failed',
71+
});
72+
resolve();
73+
}),
74+
);
75+
});
76+
4477
afterEach(() => app.close());
4578
});
Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,29 @@
1-
import { SubscribeMessage, WebSocketGateway } from '@nestjs/websockets';
1+
import {
2+
Ack,
3+
MessageBody,
4+
SubscribeMessage,
5+
WebSocketGateway,
6+
} from '@nestjs/websockets';
27

38
@WebSocketGateway(8080)
49
export class AckGateway {
510
@SubscribeMessage('push')
611
onPush() {
712
return 'pong';
813
}
14+
15+
@SubscribeMessage('manual-ack')
16+
async handleManualAck(
17+
@MessageBody() data: any,
18+
@Ack() ack: (response: any) => void,
19+
) {
20+
await new Promise(resolve => setTimeout(resolve, 20));
21+
22+
if (data.shouldSucceed) {
23+
ack({ status: 'success', data });
24+
} else {
25+
ack({ status: 'error', message: 'Operation failed' });
26+
}
27+
return { status: 'ignored' };
28+
}
929
}

packages/common/enums/route-paramtypes.enum.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ export enum RouteParamtypes {
1212
HOST = 10,
1313
IP = 11,
1414
RAW_BODY = 12,
15+
ACK = 13,
1516
}

packages/common/interfaces/websockets/web-socket-adapter.interface.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Observable } from 'rxjs';
66
export interface WsMessageHandler<T = string> {
77
message: T;
88
callback: (...args: any[]) => Observable<any> | Promise<any>;
9+
isAckHandledManually: boolean;
910
}
1011

1112
/**

packages/platform-socket.io/adapters/io-adapter.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,24 @@ export class IoAdapter extends AbstractWsAdapter {
4444
first(),
4545
);
4646

47-
handlers.forEach(({ message, callback }) => {
47+
handlers.forEach(({ message, callback, isAckHandledManually }) => {
4848
const source$ = fromEvent(socket, message).pipe(
4949
mergeMap((payload: any) => {
5050
const { data, ack } = this.mapPayload(payload);
5151
return transform(callback(data, ack)).pipe(
5252
filter((response: any) => !isNil(response)),
53-
map((response: any) => [response, ack]),
53+
map((response: any) => [response, ack, isAckHandledManually]),
5454
);
5555
}),
5656
takeUntil(disconnect$),
5757
);
58-
source$.subscribe(([response, ack]) => {
58+
source$.subscribe(([response, ack, isAckHandledManually]) => {
5959
if (response.event) {
6060
return socket.emit(response.event, response.data);
6161
}
62-
isFunction(ack) && ack(response);
62+
if (!isAckHandledManually && isFunction(ack)) {
63+
ack(response);
64+
}
6365
});
6466
});
6567
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { WsParamtype } from '../enums/ws-paramtype.enum';
22

33
export const DEFAULT_CALLBACK_METADATA = {
4+
[`${WsParamtype.ACK}:2`]: { index: 2, data: undefined, pipes: [] },
45
[`${WsParamtype.PAYLOAD}:1`]: { index: 1, data: undefined, pipes: [] },
56
[`${WsParamtype.SOCKET}:0`]: { index: 0, data: undefined, pipes: [] },
67
};
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { WsParamtype } from '../enums/ws-paramtype.enum';
2+
import { createPipesWsParamDecorator } from '../utils/param.utils';
3+
4+
/**
5+
* WebSockets `ack` parameter decorator.
6+
* Extracts the `ack` callback function from the arguments of a ws event.
7+
*
8+
* This decorator signals to the framework that the `ack` callback will be
9+
* handled manually within the method, preventing the framework from
10+
* automatically sending an acknowledgement based on the return value.
11+
*
12+
* @example
13+
* ```typescript
14+
* @SubscribeMessage('events')
15+
* onEvent(
16+
* @MessageBody() data: string,
17+
* @Ack() ack: (response: any) => void
18+
* ) {
19+
* // Manually call the ack callback
20+
* ack({ status: 'ok' });
21+
* }
22+
* ```
23+
*
24+
* @publicApi
25+
*/
26+
export function Ack(): ParameterDecorator {
27+
return createPipesWsParamDecorator(WsParamtype.ACK)();
28+
}

packages/websockets/decorators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from './gateway-server.decorator';
33
export * from './message-body.decorator';
44
export * from './socket-gateway.decorator';
55
export * from './subscribe-message.decorator';
6+
export * from './ack.decorator';

packages/websockets/enums/ws-paramtype.enum.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ import { RouteParamtypes } from '@nestjs/common/enums/route-paramtypes.enum';
33
export enum WsParamtype {
44
SOCKET = RouteParamtypes.REQUEST,
55
PAYLOAD = RouteParamtypes.BODY,
6+
ACK = RouteParamtypes.ACK,
67
}

packages/websockets/factories/ws-params-factory.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { isFunction } from '@nestjs/common/utils/shared.utils';
12
import { WsParamtype } from '../enums/ws-paramtype.enum';
23

34
export class WsParamsFactory {
@@ -14,6 +15,9 @@ export class WsParamsFactory {
1415
return args[0];
1516
case WsParamtype.PAYLOAD:
1617
return data ? args[1]?.[data] : args[1];
18+
case WsParamtype.ACK: {
19+
return args.find(arg => isFunction(arg));
20+
}
1721
default:
1822
return null;
1923
}

0 commit comments

Comments
 (0)