Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/SuperSocket.MQTT.Server/Command/DISCONNECT.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Command;
using SuperSocket.MQTT.Packets;
using SuperSocket.Server.Abstractions.Session;

namespace SuperSocket.MQTT.Server.Command
{
/// <summary>
/// Handles DISCONNECT control packets from clients.
/// When a client sends DISCONNECT, the server should close the connection cleanly.
/// </summary>
[Command(Key = ControlPacketType.DISCONNECT)]
public class DISCONNECT : IAsyncCommand<MQTTPacket>
{
public async ValueTask ExecuteAsync(IAppSession session, MQTTPacket package, CancellationToken cancellationToken)
{
// Close the session gracefully when client sends DISCONNECT
// The session cleanup (unsubscribing from topics) is handled by TopicMiddleware.UnRegisterSession
await session.CloseAsync(SuperSocket.Connection.CloseReason.RemoteClosing);
}
}
}
8 changes: 6 additions & 2 deletions src/SuperSocket.MQTT.Server/Command/PUBREC.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace SuperSocket.MQTT.Server.Command
{
/// <summary>
/// Handles PUBREC (Publish Received) control packets for QoS 2 publish flow.
/// When server receives PUBREC, it responds with PUBREL (Publish Release).
/// </summary>
[Command(Key = ControlPacketType.PUBREC)]
public class PUBREC : IAsyncCommand<MQTTPacket>
{
Expand All @@ -17,10 +21,10 @@ public async ValueTask ExecuteAsync(IAppSession session, MQTTPacket package, Can
{
var pubRecPacket = package as PubRecPacket;

// Create a response with the same packet identifier
// Respond with PUBREL (Publish Release) - fixed header byte: 0x62 (type 6 with reserved bits 0010)
var buffer = _memoryPool.Rent(4);

buffer[0] = 0x50; // PUBREC packet type
buffer[0] = 0x62; // PUBREL packet type (0110 0010)
buffer[1] = 2; // Remaining length
buffer[2] = (byte)(pubRecPacket.PacketIdentifier >> 8);
buffer[3] = (byte)(pubRecPacket.PacketIdentifier & 0xFF);
Expand Down
8 changes: 6 additions & 2 deletions src/SuperSocket.MQTT.Server/Command/PUBREL.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace SuperSocket.MQTT.Server.Command
{
/// <summary>
/// Handles PUBREL (Publish Release) control packets for QoS 2 publish flow.
/// When server receives PUBREL, it responds with PUBCOMP (Publish Complete).
/// </summary>
[Command(Key = ControlPacketType.PUBREL)]
public class PUBREL : IAsyncCommand<MQTTPacket>
{
Expand All @@ -17,10 +21,10 @@ public async ValueTask ExecuteAsync(IAppSession session, MQTTPacket package, Can
{
var pubRelPacket = package as PubRelPacket;

// Create a response with the same packet identifier
// Respond with PUBCOMP (Publish Complete) - fixed header byte: 0x70
var buffer = _memoryPool.Rent(4);

buffer[0] = 0x60; // PUBREL packet type
buffer[0] = 0x70; // PUBCOMP packet type
buffer[1] = 2; // Remaining length
buffer[2] = (byte)(pubRelPacket.PacketIdentifier >> 8);
buffer[3] = (byte)(pubRelPacket.PacketIdentifier & 0xFF);
Expand Down
24 changes: 16 additions & 8 deletions src/SuperSocket.MQTT.Server/Command/SUBSCRIBE.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,37 @@ public async ValueTask ExecuteAsync(IAppSession session, MQTTPacket package, Can
_topicManager.SubscribeTopic(mqttSession, topicFilter.Topic);
}

var buffer = _memoryPool.Rent(5);
// SUBACK: 2 bytes for packet identifier + 1 byte per topic filter for return code
var topicCount = subpacket.TopicFilters.Count;
var responseLength = 2 + topicCount;
var buffer = _memoryPool.Rent(2 + responseLength);

WriteBuffer(buffer, subpacket);
WriteBuffer(buffer, subpacket, responseLength);

try
{
await session.SendAsync(buffer.AsMemory()[..5]);
await session.SendAsync(buffer.AsMemory()[..(2 + responseLength)]);
}
finally
{
_memoryPool.Return(buffer);
}
}

private void WriteBuffer(byte[] buffer, SubscribePacket packet)
private void WriteBuffer(byte[] buffer, SubscribePacket packet, int remainingLength)
{
buffer[0] = 144;
buffer[1] = 3;
buffer[0] = 144; // SUBACK packet type (0x90)
buffer[1] = (byte)remainingLength;

BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan().Slice(2), packet.PacketIdentifier);

// Use the first topic filter's QoS, or default to 0
buffer[4] = packet.TopicFilters.Count > 0 ? packet.TopicFilters[0].QoS : (byte)0;
// Write return code (granted QoS) for each topic filter
for (int i = 0; i < packet.TopicFilters.Count; i++)
{
// Return the granted QoS (same as requested for now)
// Valid values: 0x00 (QoS 0), 0x01 (QoS 1), 0x02 (QoS 2), 0x80 (Failure)
buffer[4 + i] = packet.TopicFilters[i].QoS;
}
}
}
}
Loading