feature: [158] Implement global mqtt message storage service#163
Conversation
…ent-global-mqtt-message-storage-service
…ent-global-mqtt-message-storage-service
…ent-global-mqtt-message-storage-service
There was a problem hiding this comment.
Pull request overview
This PR refactors publish handling around a new model.publish abstraction and introduces in-memory publish-data storage/routing primitives for the broker’s inbound, outbound, retained, and session-related publish flows. It replaces the older publish handler/service naming with router/processor/dispatcher/sender components and updates tests/configuration to follow the new structure.
Changes:
- Introduces new publish-domain types (
Publish,PublishData, incoming/outgoing variants) and updates network message encoding/decoding to use them. - Replaces legacy publish receiving/delivering services with incoming processors, subscriber senders, and dispatcher/router abstractions across
core-service. - Wires the new services into Spring/test configuration, updates retained-publish/session support, and renames or migrates related tests.
Reviewed changes
Copilot reviewed 74 out of 81 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy |
Renames/shared publish test fixtures to use PublishData. |
network/src/test/groovy/javasabr/mqtt/network/message/out/PublishMqtt5OutMessageTest.groovy |
Updates MQTT 5 publish serialization test inputs/assertions for PublishData. |
network/src/test/groovy/javasabr/mqtt/network/message/out/PublishMqtt311OutMessageTest.groovy |
Updates MQTT 3.1.1 publish serialization tests for PublishData. |
network/src/test/groovy/javasabr/mqtt/network/message/in/PublishMqttInMessageTest.groovy |
Renames publish payload/correlation fixtures in inbound publish parsing tests. |
network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt5OutMessage.java |
Switches MQTT 5 outgoing publish encoding to read properties from PublishData. |
network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java |
Switches MQTT 3.1.1 outgoing publish payload handling to PublishData. |
model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestPublishFactory.groovy |
Replaces old publish factory helpers with IncomingPublish/PublishData helpers. |
model/src/test/groovy/javasabr/mqtt/model/topic/tree/RetainedMessageTreeTest.groovy |
Adapts retained tree tests to the new publish helpers. |
model/src/main/java/javasabr/mqtt/model/topic/tree/RetainedPublishNode.java |
Renames retained trie node internals from “message” to “publish”. |
model/src/main/java/javasabr/mqtt/model/topic/tree/ConcurrentRetainedMessageTree.java |
Updates retained tree root/node types to new publish naming. |
model/src/main/java/javasabr/mqtt/model/session/PublishRetryer.java |
Updates session retry contract to new Publish package. |
model/src/main/java/javasabr/mqtt/model/session/ProcessingPublishes.java |
Updates processing contract imports to new Publish package. |
model/src/main/java/javasabr/mqtt/model/session/MqttSession.java |
Adds generateDataId() for publish-data identifiers. |
model/src/main/java/javasabr/mqtt/model/publish/package-info.java |
Marks new publish package as @NullMarked. |
model/src/main/java/javasabr/mqtt/model/publish/impl/package-info.java |
Marks publish implementation package as @NullMarked. |
model/src/main/java/javasabr/mqtt/model/publish/impl/InMemoryPublishData.java |
Adds in-memory publish payload/correlation-data container. |
model/src/main/java/javasabr/mqtt/model/publish/TrackableOutgoingPublish.java |
Adds trackable outgoing publish wrapper. |
model/src/main/java/javasabr/mqtt/model/publish/PublishData.java |
Defines the new publish-data abstraction and convenience wrappers. |
model/src/main/java/javasabr/mqtt/model/publish/Publish.java |
Introduces sealed publish interface used across the refactor. |
model/src/main/java/javasabr/mqtt/model/publish/OutgoingPublish.java |
Adds QoS 0 outgoing publish wrapper. |
model/src/main/java/javasabr/mqtt/model/publish/IncomingPublish.java |
Adds inbound publish record plus minimal constructors/helpers. |
gradle/libs.versions.toml |
Bumps rlib dependency version. |
core-service/src/test/groovy/javasabr/mqtt/service/session/impl/OldestSessionCleanerTest.groovy |
Updates session tests for new session constructor signature. |
core-service/src/test/groovy/javasabr/mqtt/service/session/impl/ExpiredSessionCleanerTest.groovy |
Updates expired-session tests for new session constructor signature. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/QosSubscriberPublishSenderTest.groovy |
Adds renamed base test for subscriber senders. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/Qos2SubscriberPublishSenderTest.groovy |
Migrates QoS 2 outbound publish tests to sender naming/types. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/Qos1SubscriberPublishSenderTest.groovy |
Migrates QoS 1 outbound publish tests to sender naming/types. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/sender/Qos0SubscriberPublishSenderTest.groovy |
Migrates QoS 0 outbound publish tests to sender naming/types. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/QosIncomingPublishProcessorTest.groovy |
Adds renamed base test for incoming publish processors. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/Qos2IncomingPublishProcessorTest.groovy |
Migrates QoS 2 inbound publish tests to processor naming/types. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/Qos1IncomingPublishProcessorTest.groovy |
Migrates QoS 1 inbound publish tests to processor naming/types. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/processor/Qos0IncomingPublishProcessorTest.groovy |
Migrates QoS 0 inbound publish tests to processor naming/types. |
core-service/src/test/groovy/javasabr/mqtt/service/publish/handler/impl/QosMqttPublishOutMessageHandlerTest.groovy |
Removes old outbound publish handler base test. |
core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandlerTest.groovy |
Updates retained-publish helper usage in subscribe handler tests. |
core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy |
Injects publish-data storage into inbound publish handler tests. |
core-service/src/test/groovy/javasabr/mqtt/service/IntegrationServiceSpecification.groovy |
Rewires shared test setup to new dispatcher/router/storage classes. |
core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java |
Updates processing implementation import to new Publish package. |
core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryNetworkMqttSession.java |
Adds internal session ID and data-ID generation. |
core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java |
Allocates per-session internal IDs when creating sessions. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/package-info.java |
Marks sender package as @NullMarked. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/TrackableSubscriberPublishSender.java |
Renames/refactors trackable outbound publish handler into sender. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/SubscriberPublishSender.java |
Adds outbound subscriber sender contract. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/Qos2SubscriberPublishSender.java |
Renames QoS 2 sender implementation. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/Qos1SubscriberPublishSender.java |
Renames QoS 1 sender implementation. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/Qos0SubscriberPublishSender.java |
Adds QoS 0 sender implementation. |
core-service/src/main/java/javasabr/mqtt/service/publish/sender/AbstractSubscriberPublishSender.java |
Renames/refactors abstract outbound publish handler base class. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/package-info.java |
Marks processor package as @NullMarked. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/TrackableIncomingPublishProcessor.java |
Renames/refactors trackable inbound publish handler base class. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/Qos2IncomingPublishProcessor.java |
Renames QoS 2 inbound processor and updates dependencies. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/Qos1IncomingPublishProcessor.java |
Renames QoS 1 inbound processor and updates dependencies. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/Qos0IncomingPublishProcessor.java |
Renames QoS 0 inbound processor and updates dependencies. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/PublishProcessingResult.java |
Renames publish handling result enum/package. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/IncomingPublishProcessor.java |
Adds inbound publish processor contract. |
core-service/src/main/java/javasabr/mqtt/service/publish/processor/AbstractIncomingPublishProcessor.java |
Renames/refactors abstract inbound publish processing flow. |
core-service/src/main/java/javasabr/mqtt/service/publish/package-info.java |
Marks new publish service package as @NullMarked. |
core-service/src/main/java/javasabr/mqtt/service/publish/impl/package-info.java |
Marks publish implementation package as @NullMarked. |
core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryRetainPublishService.java |
Renames retained publish service and updates to PublishData. |
core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryPublishDataStorage.java |
Adds in-memory publish-data storage service. |
core-service/src/main/java/javasabr/mqtt/service/publish/impl/DefaultPublishDispatcher.java |
Adds dispatcher replacing old publish delivering service. |
core-service/src/main/java/javasabr/mqtt/service/publish/impl/DefaultIncomingPublishRouter.java |
Adds router replacing old publish receiving service. |
core-service/src/main/java/javasabr/mqtt/service/publish/handler/package-info.java |
Removes old publish handler package marker. |
core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/package-info.java |
Removes old publish handler impl package marker. |
core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishOutMessageHandler.java |
Deletes old QoS 0 outbound handler implementation. |
core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishOutMessageHandler.java |
Deletes old outbound publish handler interface. |
core-service/src/main/java/javasabr/mqtt/service/publish/handler/MqttPublishInMessageHandler.java |
Deletes old inbound publish handler interface. |
core-service/src/main/java/javasabr/mqtt/service/publish/RetainPublishService.java |
Adds retained publish service contract. |
core-service/src/main/java/javasabr/mqtt/service/publish/PublishDispatcher.java |
Adds publish dispatching contract. |
core-service/src/main/java/javasabr/mqtt/service/publish/PublishDataStorage.java |
Adds publish-data storage contract. |
core-service/src/main/java/javasabr/mqtt/service/publish/IncomingPublishRouter.java |
Adds inbound publish routing contract. |
core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java |
Updates publish factory API to accept PublishData. |
core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java |
Updates MQTT 5 factory implementation for PublishData. |
core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java |
Updates MQTT 3.1.1 factory implementation for PublishData. |
core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java |
Switches subscribe handler to new retain/dispatch services. |
core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java |
Stores payloads as PublishData and routes via new incoming router. |
core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishReceivingService.java |
Removes old publish receiving service implementation. |
core-service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java |
Removes old publish delivering service implementation. |
core-service/src/main/java/javasabr/mqtt/service/RetainMessageService.java |
Removes old retained-message service contract. |
core-service/src/main/java/javasabr/mqtt/service/PublishReceivingService.java |
Removes old publish receiving service contract. |
core-service/src/main/java/javasabr/mqtt/service/PublishDeliveringService.java |
Removes old publish delivering service contract. |
application/src/test/resources/log4j2-test.xml |
Updates logger names for renamed publish components. |
application/src/main/java/javasabr/mqtt/broker/application/config/MqttBrokerSpringConfig.java |
Rewires Spring beans to new publish router/dispatcher/storage services. |
Comments suppressed due to low confidence (2)
core-service/src/main/java/javasabr/mqtt/service/publish/impl/InMemoryRetainPublishService.java:27
- The retained-message path ignores
messageExpiryInterval. Once a retained publish is added here,findRetainedPublishes()will keep returning it forever, so expired retained messages are never evicted or filtered out.
core-service/src/main/java/javasabr/mqtt/service/publish/sender/TrackableSubscriberPublishSender.java:49 - This sender drops the subscription identifiers by hard-coding
IntArray.EMPTY. MQTT 5 subscribers matched by a subscription with a non-defaultsubscriptionIdwill never receive the requiredSUBSCRIPTION_IDENTIFIERproperty for QoS 1/2 deliveries.
| protected Publish buildOutgoing( | ||
| ExternalNetworkMqttUser user, | ||
| MqttSession session, | ||
| Publish incoming) { | ||
| return new OutgoingPublish(incoming, incoming.retained(), IntArray.EMPTY); |
There was a problem hiding this comment.
currently we don't support resolving this field
| PublishData storedPublishData = publishDataStorage.store( | ||
| session.generateDataId(), | ||
| message.contentType(), | ||
| message.payloadFormat(), | ||
| message.payload(), | ||
| message.correlationData()); |
There was a problem hiding this comment.
it will be implemented in next part
| import javasabr.mqtt.model.topic.TopicFilter; | ||
| import javasabr.rlib.collections.array.Array; | ||
|
|
||
| public interface RetainPublishService { |
There was a problem hiding this comment.
Maybe RetainPublishRetryer 🤔
There was a problem hiding this comment.
Just thinking out loud against the backdrop of other renamings. The comment can be regarded as nitpicking and ignored.
|
|
||
| @Nullable | ||
| protected abstract Publish reconstruct(U user, MqttSession session, Publish original); | ||
| protected abstract Publish buildOutgoing(U user, MqttSession session, Publish incoming); |
There was a problem hiding this comment.
Let's rename the method to buildOutgoingPublish
|
|
||
| import javasabr.mqtt.service.IntegrationServiceSpecification | ||
|
|
||
| abstract class QosSubscriberPublishSenderTest extends IntegrationServiceSpecification { |
There was a problem hiding this comment.
@crazyrokr it's for a case if you want to add extra common preparatios for such kind of tests
| try { | ||
| PublishData exist = idToPublishData.putIfAbsent(publishData.id(), publishData); | ||
| if (exist != null) { | ||
| throw new IllegalArgumentException("Publish data:%s already exists".formatted(publishData)); |
There was a problem hiding this comment.
Could we consider adding tests of this logic? It would be good to guard this logic from double call of the store() method
| PublishReceivingService publishReceivingService( | ||
| Collection<? extends MqttPublishInMessageHandler> knownPublishInHandlers) { | ||
| return new DefaultPublishReceivingService(knownPublishInHandlers); | ||
| IncomingPublishRouter IncomingPublishRouter( |
There was a problem hiding this comment.
Looks like there is a typo in the first letter of the method name: it should be a small i instead of a capital I
Issue
#158
Main point
I introduce a new publish data abstraction layer wich allow us to handle long-term live publish data or big publish data to avoid keeping it in memory all the time.
Pull request overview
This PR refactors publish handling around a new
model.publishabstraction and introduces in-memory publish-data storage/routing primitives for the broker’s inbound, outbound, retained, and session-related publish flows. It replaces the older publish handler/service naming with router/processor/dispatcher/sender components and updates tests/configuration to follow the new structure.Changes:
Publish,PublishData, incoming/outgoing variants) and updates network message encoding/decoding to use them.core-service.