Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking calls #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Arooba-git
Copy link

Hi! 👋

Apparently the chat module has some blocking calls (as detected by BlockHound):
Screen Shot 2023-07-15 at 1 51 11 AM
Screen Shot 2023-07-15 at 1 57 33 AM

This PR addresses these blocking code to ensure the pipeline remain reactive.

Copy link
Owner

@RawSanj RawSanj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please work on the suggested changes?

PS: Sorry for the late reply and thanks for the PR 👍🏼

long activeUserCount = activeUserCounter.decrementAndGet();
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount);
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount));
}).subscribeOn(Schedulers.boundedElastic());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing .subscribe() here. The code inside Mono.fromRunnable won't execute unless explicitly subscribed or blocked.

@@ -23,7 +24,9 @@ public RouterFunction<ServerResponse> htmlRouter(@Value("classpath:/static/index
return route(GET("/"), request -> ok().contentType(MediaType.TEXT_HTML).bodyValue(html))
.andRoute(POST("/message"), request -> request.bodyToMono(Message.class)
.flatMap(message -> redisChatMessagePublisher.publishChatMessage(message.getMessage()))
.flatMap(aLong -> ServerResponse.ok().bodyValue(new Message("Message Sent Successfully!."))));
.flatMap(aLong -> ServerResponse.ok().bodyValue(new Message("Message Sent Successfully!.")))
.subscribeOn(Schedulers.boundedElastic()));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to make this run on Schedulers.boundedElastic() Did the BlockHound detected blocking call over here or inside the RedisChatMessagePublisher?

Copy link
Author

@Arooba-git Arooba-git Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first stack trace (first screenshot in the post), we see that the error propagates to RedisChatMessagePublisher but was triggered by WebHttpHandler, line 26 right? :)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first stack trace (first screenshot in the post), we see that the error propagates to RedisChatMessagePublisher but was triggered by WebHttpHandler, line 26 right? :)

The other way round. The actual blocking call is at line - https://github.com/RawSanj/spring-redis-websocket/blob/master/src/main/java/com/github/rawsanj/messaging/RedisChatMessagePublisher.java#L35C11-L35C28 the Atomic Increment is blocking.

I think we should be good if we move that blocking call Integer totalChatMessage = chatMessageCounter.incrementAndGet() inside the Mono.fromCallable() and then add the .subscribeOn(Schedulers.boundedElastic()) in the end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants