Skip to content

Conversation

kickster97
Copy link
Member

@kickster97 kickster97 commented Aug 19, 2025

WHAT is this pull request doing?

Allow AMQP queues and exchanges bind to MQTT exchanges and session and vice versa.

Today the mqtt exchange is an internal exchange, we can consider to not have it this way

Note that AMQP queues will not receive retained messages on bind("subscribe"). But it will receive messages as usual when published.

Fixes #1136

HOW can this pull request be tested?

Run specs

Copy link

claude bot commented Aug 19, 2025

Claude finished @kickster97's task —— View job


Pull Request Review

I've completed a comprehensive review of this PR that implements MQTT/AMQP cross-protocol binding functionality. Here are my findings and recommendations:

Todo List

  • Read and analyze the main changed files
  • Review code quality and best practices
  • Check for potential bugs and issues
  • Assess performance considerations
  • Review security concerns
  • Evaluate test coverage
  • Provide comprehensive feedback

Overall Assessment

This PR successfully implements cross-protocol binding between MQTT and AMQP, allowing AMQP queues/exchanges to bind to MQTT exchanges and vice versa. The implementation is architecturally sound but has several areas that need attention.

Code Quality & Best Practices

Strengths:

  • Well-structured type system with union types for different destination types
  • Good separation of concerns with protocol transformation methods
  • Comprehensive test coverage for various cross-protocol scenarios
  • Proper use of Crystal's type system and pattern matching

⚠️ Areas for Improvement:

1. Code Duplication (High Priority)
The mqtt/exchange.cr:79-112 section contains repetitive code blocks for handling different destination types. Consider extracting common logic:

# Lines 75-79, 87-91, 98-102, 110-112 all follow the same pattern
private def handle_destination_publish(destination, msg, count, msg_body_io, queues_or_exchanges)
  if destination.publish(msg) || queues_or_exchanges.add?(destination)
    count += 1
    msg.body_io.rewind
    queues_or_exchanges.add(destination) if queues_or_exchanges.is_a?(Set(LavinMQ::Queue))
  end
end

2. Debug Statement (src/lavinmq/amqp/exchange/exchange.cr)
Remove the debug print statement as noted in the review comments.

Performance Considerations

Critical Issues:

1. Memory Allocation in Hot Path (mqtt/exchange.cr:50)

def publish(packet : MQTT::Publish, queues : Set(LavinMQ::Queue), exchanges : Set(LavinMQ::Exchange)) : UInt32

Creating new sets on every publish will severely impact performance. The current approach of passing sets from the client level is correct to avoid this allocation.

2. Missing Return Value Handling
Several publish calls don't properly accumulate the count return value:

  • Line 75: destination.publish(amqp_msg) - should increment count
  • Line 105-110: Exchange publish calls should handle return counts properly

⚠️ Moderate Concerns:

  • The find_queues_internal method is only used once but adds complexity. Consider inlining unless future usage is planned.

Security Concerns

Good:

  • Proper permission checks in HTTP controller
  • Internal exchange protection maintained (mqtt.default exception)
  • Type safety preserved throughout

⚠️ Minor:

  • Hard-coded string "mqtt.default" in src/lavinmq/http/controller/bindings.cr:140 should be extracted to a constant or method for maintainability.

Architectural Concerns

Design Issues:

1. Tight Coupling
The current design requires @queues and @exchanges sets to be managed at the client level, creating tight coupling between clients and exchanges. While the PR author explains this is needed for thread safety, this makes the code harder to understand and maintain.

Recommendation: Consider a different approach such as:

  • Thread-local storage for these sets
  • Connection-scoped context objects
  • Pool of reusable set objects

2. Protocol Transformation
The transformation between MQTT topics and AMQP routing keys (mqtt_topic_to_amqp_routing_key) is simplistic:

mqtt_topic.gsub('/', '.').gsub('+', '*')

This may not handle all edge cases. Consider more robust transformation logic or validation.

Test Coverage

Excellent Coverage:

  • Cross-protocol binding scenarios
  • QoS handling and arguments
  • Topic pattern matching
  • Multiple queue binding with different QoS
  • Unbinding operations
  • MQTT wildcard patterns

The test suite demonstrates thorough understanding of the requirements and edge cases.

Bugs Found

  1. Missing count increments in mqtt/exchange.cr lines 75, 105-110
  2. Inconsistent return value handling for exchange publish operations
  3. Debug statement needs removal

Recommendations

High Priority:

  1. Fix the memory allocation issue by ensuring sets are reused properly
  2. Fix missing count increments for proper message routing statistics
  3. Remove debug statements

Medium Priority:

  1. Reduce code duplication in publish method
  2. Extract hard-coded strings to constants
  3. Consider architectural improvements for the queues/exchanges set management

Low Priority:

  1. Inline find_queues_internal if not used elsewhere
  2. Improve protocol transformation robustness

Conclusion

This is a valuable feature that successfully implements cross-protocol binding. The core functionality works well, but performance optimizations and code quality improvements are needed before merging. The memory allocation issue in the hot path is the most critical concern that must be addressed.

The implementation demonstrates good understanding of both MQTT and AMQP protocols and their integration challenges. With the suggested fixes, this will be a solid addition to LavinMQ's capabilities.

@kickster97 kickster97 changed the title Allow cross protocol binding Allow MQTT/AMQP cross protocol binding Aug 19, 2025
@kickster97 kickster97 force-pushed the mqtt-to-amqp-binding branch from 6958bdd to b0dd5bd Compare August 19, 2025 08:42
Copy link
Member

@carlhoerberg carlhoerberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool feature! But allocations in the hot path is no good

@kickster97 kickster97 force-pushed the mqtt-to-amqp-binding branch from 2a74f01 to 5260eb4 Compare August 25, 2025 13:58
@kickster97 kickster97 marked this pull request as ready for review August 27, 2025 08:07
@kickster97 kickster97 requested a review from a team as a code owner August 27, 2025 08:07
@kickster97 kickster97 force-pushed the mqtt-to-amqp-binding branch from 4e808b7 to 46c14d6 Compare August 27, 2025 08:27
@snichme
Copy link
Member

snichme commented Aug 28, 2025

I had some ideas that I wrote down in another PR: #1224

Mainly it was setting up @queues and @exchanges in Exchange class instead of sending them from the client all the way down. But this might break some cases, not sure.

Also one more suggestion, not use the find_queues_internal as it's only used one time and it will make the diff for the PR larger. I the method is used more than one time we can extract it to a method.

WDYT?

@kickster97
Copy link
Member Author

kickster97 commented Sep 3, 2025

I had some ideas that I wrote down in another PR: #1224

Mainly it was setting up @queues and @exchanges in Exchange class instead of sending them from the client all the way down. But this might break some cases, not sure.

Also one more suggestion, not use the find_queues_internal as it's only used one time and it will make the diff for the PR larger. I the method is used more than one time we can extract it to a method.

WDYT?

As we discussed, the find_queues_internal method is needed for the MQTT exchange to work together with the SubscriptionTree.

@queues and @exchanges need to be created at the client level, so they are isolated to that client, otherwise multiple clients publishing simultaneously would share the same sets and there would be modification of same sets from different threads.

Copy link
Member

@snichme snichme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now understand why we setup @queues and @exchanges in the client, but I dont like it. It makes it really hard to understand and we are tying client together with exchanges.
I think we should find a better solution for this, even if it requires a bigger overhaul of the discovery strategy in exchanges. If we don't, I think it will get more and more difficult as the code grows with more features.

destination.publish(msg, false, queues, exchanges)
msg.body_io.rewind
end
in LavinMQ::Exchange
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between LavinMQ::Exchange and LavinMQ::AMQP::Exchange?

also count here as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is a wip now, found some bugs yesterday so needed to rewrite this. the difference is that LavinMQ::Exchange counts for both amqp and mqtt exchanges. but i need better code overall in this case section

Comment on lines +75 to +79
if destination.publish(amqp_msg)
count += 1
amqp_msg.body_io.rewind
queues.add(destination)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These four lines are repeated three times, can we restructure to reduce repeating?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, need to refactor this, just needed the case to work as expected for a while so we had a build that worked.

elsif source.name.empty? || destination.name.empty?
access_refused(context, "Not allowed to bind to the default exchange")
elsif destination.internal?
elsif destination.internal? && destination.name != "mqtt.default"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is mqtt.default an internal or default exchange? the if above checks for default exchange and based on the name this is a default exchange.

Is there a way to not have the string "mqtt.default" here but rather some method on destination to check wether we can bind/unbind to the change? Less hard coded strings is my goal here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i agree this is unclear. Its been a "good enough" solution up untill now. I think we should have it as a default exchange and let it behave more like an amq.topic exchange. @antondalgren lifts a good issue about bindings not persiting for mqtt exchange bound to amqp exchange/queues. this is because we onyl built it to support the mqtt use case.

tldr; we need to refactor the exchange's place in lavinmq :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but new features like this might be less prioritized next week/2 weeks to prioritize squashing bugs we found in 2.4.0

@kickster97 kickster97 marked this pull request as draft September 10, 2025 09:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support cross protocol exchange to exchange binding
3 participants