Skip to content

feature: [158] Implement global mqtt message storage service#163

Merged
JavaSaBr merged 15 commits into
developfrom
feature/158-implement-global-mqtt-message-storage-service
May 6, 2026
Merged

feature: [158] Implement global mqtt message storage service#163
JavaSaBr merged 15 commits into
developfrom
feature/158-implement-global-mqtt-message-storage-service

Conversation

@JavaSaBr
Copy link
Copy Markdown
Owner

@JavaSaBr JavaSaBr commented May 2, 2026

Issue

#158

Main point

I introduce a new publish data abstraction layer wich allow us to handle long-term live publish data or big publish data to avoid keeping it in memory all the time.

Pull request overview

This PR refactors publish handling around a new model.publish abstraction and introduces in-memory publish-data storage/routing primitives for the broker’s inbound, outbound, retained, and session-related publish flows. It replaces the older publish handler/service naming with router/processor/dispatcher/sender components and updates tests/configuration to follow the new structure.

Changes:

  • Introduces new publish-domain types (Publish, PublishData, incoming/outgoing variants) and updates network message encoding/decoding to use them.
  • Replaces legacy publish receiving/delivering services with incoming processors, subscriber senders, and dispatcher/router abstractions across core-service.
  • Wires the new services into Spring/test configuration, updates retained-publish/session support, and renames or migrates related tests.

@JavaSaBr JavaSaBr requested a review from crazyrokr May 2, 2026 17:26
@JavaSaBr JavaSaBr self-assigned this May 2, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 2, 2026

Test Coverage Report

Overall Project 85.52% -0.81% 🍏
Files changed 86.02% 🍏

File Coverage
ConcurrentRetainedMessageTree.java 100% 🍏
PublishMqtt5OutMessage.java 100% 🍏
PublishProcessingResult.java 100% 🍏
TrackableIncomingPublishProcessor.java 100% 🍏
Qos0SubscriberPublishSender.java 100% 🍏
PublishData.java 100% 🍏
SubscribeMqttInMessageHandler.java 99.77% 🍏
PublishMqtt311OutMessage.java 97.92% 🍏
RetainedPublishNode.java 97.66% -1.87% 🍏
InMemoryNetworkMqttSession.java 94.69% 🍏
InMemoryMqttSessionService.java 92.26% 🍏
Mqtt5MessageOutFactory.java 91.51% 🍏
IncomingPublish.java 91.45% -8.55% 🍏
MqttBrokerSpringConfig.java 88.56% 🍏
DefaultPublishDispatcher.java 84.38% -15.63% 🍏
DefaultIncomingPublishRouter.java 83.68% -16.32% 🍏
PublishMqttInMessageHandler.java 83.57% 🍏
InMemoryPublishData.java 81.48% -18.52% 🍏
Qos1IncomingPublishProcessor.java 80.6% 🍏
InMemoryRetainPublishService.java 80% 🍏
Qos1SubscriberPublishSender.java 78.61% 🍏
InMemoryProcessingPublishes.java 78.53% 🍏
Qos2SubscriberPublishSender.java 75.95% 🍏
PublishRetryer.java 75% 🍏
TrackableSubscriberPublishSender.java 73.08% 🍏
Mqtt311MessageOutFactory.java 69.12% 🍏
TrackableOutgoingPublish.java 68.67% -31.33% 🍏
MqttMessageOutFactory.java 66.89% 🍏
Qos2IncomingPublishProcessor.java 66.46% 🍏
AbstractIncomingPublishProcessor.java 63.14% -1.96% 🍏
OutgoingPublish.java 62.07% -37.93% 🍏
AbstractSubscriberPublishSender.java 60.75% 🍏
Qos0IncomingPublishProcessor.java 52.5% 🍏
InMemoryPublishDataStorage.java 48.15% -51.85%

@JavaSaBr JavaSaBr marked this pull request as ready for review May 2, 2026 17:57
Copilot AI review requested due to automatic review settings May 2, 2026 17:57
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors publish handling around a new model.publish abstraction and introduces in-memory publish-data storage/routing primitives for the broker’s inbound, outbound, retained, and session-related publish flows. It replaces the older publish handler/service naming with router/processor/dispatcher/sender components and updates tests/configuration to follow the new structure.

Changes:

  • Introduces new publish-domain types (Publish, PublishData, incoming/outgoing variants) and updates network message encoding/decoding to use them.
  • Replaces legacy publish receiving/delivering services with incoming processors, subscriber senders, and dispatcher/router abstractions across core-service.
  • Wires the new services into Spring/test configuration, updates retained-publish/session support, and renames or migrates related tests.

Reviewed changes

Copilot reviewed 74 out of 81 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy Renames/shared publish test fixtures to use PublishData.
network/src/test/groovy/javasabr/mqtt/network/message/out/PublishMqtt5OutMessageTest.groovy Updates MQTT 5 publish serialization test inputs/assertions for PublishData.
network/src/test/groovy/javasabr/mqtt/network/message/out/PublishMqtt311OutMessageTest.groovy Updates MQTT 3.1.1 publish serialization tests for PublishData.
network/src/test/groovy/javasabr/mqtt/network/message/in/PublishMqttInMessageTest.groovy Renames publish payload/correlation fixtures in inbound publish parsing tests.
network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt5OutMessage.java Switches MQTT 5 outgoing publish encoding to read properties from PublishData.
network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java Switches MQTT 3.1.1 outgoing publish payload handling to PublishData.
model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestPublishFactory.groovy Replaces old publish factory helpers with IncomingPublish/PublishData helpers.
model/src/test/groovy/javasabr/mqtt/model/topic/tree/RetainedMessageTreeTest.groovy Adapts retained tree tests to the new publish helpers.
model/src/main/java/javasabr/mqtt/model/topic/tree/RetainedPublishNode.java Renames retained trie node internals from “message” to “publish”.
model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentRetainedMessageTree.java Updates retained tree root/node types to new publish naming.
model/src/main/java/javasabr/mqtt/model/session/PublishRetryer.java Updates session retry contract to new Publish package.
model/src/main/java/javasabr/mqtt/model/session/ProcessingPublishes.java Updates processing contract imports to new Publish package.
model/src/main/java/javasabr/mqtt/model/session/MqttSession.java Adds generateDataId() for publish-data identifiers.
model/src/main/java/javasabr/mqtt/model/publish/package-info.java Marks new publish package as @NullMarked.
model/src/main/java/javasabr/mqtt/model/publish/impl/package-info.java Marks publish implementation package as @NullMarked.
model/src/main/java/javasabr/mqtt/model/publish/impl/InMemoryPublishData.java Adds in-memory publish payload/correlation-data container.
model/src/main/java/javasabr/mqtt/model/publish/TrackableOutgoingPublish.java Adds trackable outgoing publish wrapper.
model/src/main/java/javasabr/mqtt/model/publish/PublishData.java Defines the new publish-data abstraction and convenience wrappers.
model/src/main/java/javasabr/mqtt/model/publish/Publish.java Introduces sealed publish interface used across the refactor.
model/src/main/java/javasabr/mqtt/model/publish/OutgoingPublish.java Adds QoS 0 outgoing publish wrapper.
model/src/main/java/javasabr/mqtt/model/publish/IncomingPublish.java Adds inbound publish record plus minimal constructors/helpers.
gradle/libs.versions.toml Bumps rlib dependency version.
core-service/src/test/groovy/javasabr/mqtt/service/session/impl/OldestSessionCleanerTest.groovy Updates session tests for new session constructor signature.
core-service/src/test/groovy/javasabr/mqtt/service/session/impl/ExpiredSessionCleanerTest.groovy Updates expired-session tests for new session constructor signature.
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/QosSubscriberPublishSenderTest.groovy Adds renamed base test for subscriber senders.
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/Qos2SubscriberPublishSenderTest.groovy Migrates QoS 2 outbound publish tests to sender naming/types.
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/Qos1SubscriberPublishSenderTest.groovy Migrates QoS 1 outbound publish tests to sender naming/types.
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/Qos0SubscriberPublishSenderTest.groovy Migrates QoS 0 outbound publish tests to sender naming/types.
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/QosIncomingPublishProcessorTest.groovy Adds renamed base test for incoming publish processors.
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/Qos2IncomingPublishProcessorTest.groovy Migrates QoS 2 inbound publish tests to processor naming/types.
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/Qos1IncomingPublishProcessorTest.groovy Migrates QoS 1 inbound publish tests to processor naming/types.
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/Qos0IncomingPublishProcessorTest.groovy Migrates QoS 0 inbound publish tests to processor naming/types.
core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/QosMqttPublishOutMessageHandlerTest.groovy Removes old outbound publish handler base test.
core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandlerTest.groovy Updates retained-publish helper usage in subscribe handler tests.
core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy Injects publish-data storage into inbound publish handler tests.
core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy Rewires shared test setup to new dispatcher/router/storage classes.
core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java Updates processing implementation import to new Publish package.
core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java Adds internal session ID and data-ID generation.
core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java Allocates per-session internal IDs when creating sessions.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/package-info.java Marks sender package as @NullMarked.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/TrackableSubscriberPublishSender.java Renames/refactors trackable outbound publish handler into sender.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/SubscriberPublishSender.java Adds outbound subscriber sender contract.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/Qos2SubscriberPublishSender.java Renames QoS 2 sender implementation.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/Qos1SubscriberPublishSender.java Renames QoS 1 sender implementation.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/Qos0SubscriberPublishSender.java Adds QoS 0 sender implementation.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/AbstractSubscriberPublishSender.java Renames/refactors abstract outbound publish handler base class.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/package-info.java Marks processor package as @NullMarked.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/TrackableIncomingPublishProcessor.java Renames/refactors trackable inbound publish handler base class.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/Qos2IncomingPublishProcessor.java Renames QoS 2 inbound processor and updates dependencies.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/Qos1IncomingPublishProcessor.java Renames QoS 1 inbound processor and updates dependencies.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/Qos0IncomingPublishProcessor.java Renames QoS 0 inbound processor and updates dependencies.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/PublishProcessingResult.java Renames publish handling result enum/package.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/IncomingPublishProcessor.java Adds inbound publish processor contract.
core-service/src/main/java/javasabr/mqtt/service/publish/processor/AbstractIncomingPublishProcessor.java Renames/refactors abstract inbound publish processing flow.
core-service/src/main/java/javasabr/mqtt/service/publish/package-info.java Marks new publish service package as @NullMarked.
core-service/src/main/java/javasabr/mqtt/service/publish/impl/package-info.java Marks publish implementation package as @NullMarked.
core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryRetainPublishService.java Renames retained publish service and updates to PublishData.
core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryPublishDataStorage.java Adds in-memory publish-data storage service.
core-service/src/main/java/javasabr/mqtt/service/publish/impl/DefaultPublishDispatcher.java Adds dispatcher replacing old publish delivering service.
core-service/src/main/java/javasabr/mqtt/service/publish/impl/DefaultIncomingPublishRouter.java Adds router replacing old publish receiving service.
core-service/src/main/java/javasabr/mqtt/service/publish/handler/package-info.java Removes old publish handler package marker.
core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/package-info.java Removes old publish handler impl package marker.
core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java Deletes old QoS 0 outbound handler implementation.
core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java Deletes old outbound publish handler interface.
core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishInMessageHandler.java Deletes old inbound publish handler interface.
core-service/src/main/java/javasabr/mqtt/service/publish/RetainPublishService.java Adds retained publish service contract.
core-service/src/main/java/javasabr/mqtt/service/publish/PublishDispatcher.java Adds publish dispatching contract.
core-service/src/main/java/javasabr/mqtt/service/publish/PublishDataStorage.java Adds publish-data storage contract.
core-service/src/main/java/javasabr/mqtt/service/publish/IncomingPublishRouter.java Adds inbound publish routing contract.
core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java Updates publish factory API to accept PublishData.
core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java Updates MQTT 5 factory implementation for PublishData.
core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java Updates MQTT 3.1.1 factory implementation for PublishData.
core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java Switches subscribe handler to new retain/dispatch services.
core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java Stores payloads as PublishData and routes via new incoming router.
core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishReceivingService.java Removes old publish receiving service implementation.
core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java Removes old publish delivering service implementation.
core-service/src/main/java/javasabr/mqtt/service/RetainMessageService.java Removes old retained-message service contract.
core-service/src/main/java/javasabr/mqtt/service/PublishReceivingService.java Removes old publish receiving service contract.
core-service/src/main/java/javasabr/mqtt/service/PublishDeliveringService.java Removes old publish delivering service contract.
application/src/test/resources/log4j2-test.xml Updates logger names for renamed publish components.
application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java Rewires Spring beans to new publish router/dispatcher/storage services.
Comments suppressed due to low confidence (2)

core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryRetainPublishService.java:27

  • The retained-message path ignores messageExpiryInterval. Once a retained publish is added here, findRetainedPublishes() will keep returning it forever, so expired retained messages are never evicted or filtered out.
    core-service/src/main/java/javasabr/mqtt/service/publish/sender/TrackableSubscriberPublishSender.java:49
  • This sender drops the subscription identifiers by hard-coding IntArray.EMPTY. MQTT 5 subscribers matched by a subscription with a non-default subscriptionId will never receive the required SUBSCRIPTION_IDENTIFIER property for QoS 1/2 deliveries.

Comment on lines +25 to +29
protected Publish buildOutgoing(
ExternalNetworkMqttUser user,
MqttSession session,
Publish incoming) {
return new OutgoingPublish(incoming, incoming.retained(), IntArray.EMPTY);
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently we don't support resolving this field

Comment on lines +92 to +97
PublishData storedPublishData = publishDataStorage.store(
session.generateDataId(),
message.contentType(),
message.payloadFormat(),
message.payload(),
message.correlationData());
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be implemented in next part

import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.rlib.collections.array.Array;

public interface RetainPublishService {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe RetainPublishRetryer 🤔

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr but why re-tryer?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud against the backdrop of other renamings. The comment can be regarded as nitpicking and ignored.


@Nullable
protected abstract Publish reconstruct(U user, MqttSession session, Publish original);
protected abstract Publish buildOutgoing(U user, MqttSession session, Publish incoming);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename the method to buildOutgoingPublish

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr will rename it in the next part


import javasabr.mqtt.service.IntegrationServiceSpecification

abstract class QosSubscriberPublishSenderTest extends IntegrationServiceSpecification {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr it's for a case if you want to add extra common preparatios for such kind of tests

try {
PublishData exist = idToPublishData.putIfAbsent(publishData.id(), publishData);
if (exist != null) {
throw new IllegalArgumentException("Publish data:%s already exists".formatted(publishData));
Copy link
Copy Markdown
Collaborator

@crazyrokr crazyrokr May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consider adding tests of this logic? It would be good to guard this logic from double call of the store() method

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr added

PublishReceivingService publishReceivingService(
Collection<? extends MqttPublishInMessageHandler> knownPublishInHandlers) {
return new DefaultPublishReceivingService(knownPublishInHandlers);
IncomingPublishRouter IncomingPublishRouter(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is a typo in the first letter of the method name: it should be a small i instead of a capital I

@JavaSaBr JavaSaBr merged commit 01ee90d into develop May 6, 2026
6 checks passed
@JavaSaBr JavaSaBr deleted the feature/158-implement-global-mqtt-message-storage-service branch May 6, 2026 03:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants