88-module (rabbit_mqtt_processor ).
99
1010
11- -export ([info /2 , initial_state /2 ,
12- process_frame /2 , send_will /1 ,
11+ -export ([info /2 , initial_state /2 , initial_state / 4 ,
12+ process_frame /2 , serialise / 2 , send_will /1 ,
1313 terminate /1 , handle_pre_hibernate /0 ,
1414 handle_ra_event /2 , handle_down /2 , handle_queue_event /2 ]).
1515
3535-define (CONSUMER_TAG , mqtt ).
3636
3737initial_state (Socket , ConnectionName ) ->
38- SSLLoginName = ssl_login_name (Socket ),
3938 {ok , {PeerAddr , _PeerPort }} = rabbit_net :peername (Socket ),
40- {ok , {mqtt2amqp_fun , M2A }, {amqp2mqtt_fun , A2M }} =
41- rabbit_mqtt_util :get_topic_translation_funs (),
39+ initial_state (Socket ,
40+ ConnectionName ,
41+ fun serialise_and_send_to_client /2 ,
42+ PeerAddr ).
43+
44+ initial_state (Socket , ConnectionName , SendFun , PeerAddr ) ->
45+ {ok , {mqtt2amqp_fun , M2A }, {amqp2mqtt_fun , A2M }} = rabbit_mqtt_util :get_topic_translation_funs (),
4246 # proc_state {socket = Socket ,
4347 conn_name = ConnectionName ,
44- ssl_login_name = SSLLoginName ,
48+ ssl_login_name = ssl_login_name ( Socket ) ,
4549 peer_addr = PeerAddr ,
50+ send_fun = SendFun ,
4651 mqtt2amqp_fun = M2A ,
4752 amqp2mqtt_fun = A2M }.
4853
@@ -139,7 +144,8 @@ process_request(?SUBSCRIBE,
139144 message_id = SubscribeMsgId ,
140145 topic_table = Topics },
141146 payload = undefined },
142- # proc_state {retainer_pid = RPid } = PState0 ) ->
147+ # proc_state {send_fun = SendFun ,
148+ retainer_pid = RPid } = PState0 ) ->
143149 rabbit_log_connection :debug (" Received a SUBSCRIBE for topic(s) ~p " , [Topics ]),
144150 {QosResponse , PState1 } =
145151 lists :foldl (fun (_Topic , {[? SUBACK_FAILURE | _ ] = L , S }) ->
@@ -177,7 +183,7 @@ process_request(?SUBSCRIBE,
177183 {[? SUBACK_FAILURE | L ], S0 }
178184 end
179185 end , {[], PState0 }, Topics ),
180- serialise_and_send_to_client (
186+ SendFun (
181187 # mqtt_frame {fixed = # mqtt_frame_fixed {type = ? SUBACK },
182188 variable = # mqtt_frame_suback {
183189 message_id = SubscribeMsgId ,
@@ -198,7 +204,7 @@ process_request(?UNSUBSCRIBE,
198204 # mqtt_frame {variable = # mqtt_frame_subscribe {message_id = MessageId ,
199205 topic_table = Topics },
200206 payload = undefined },
201- PState0 ) ->
207+ PState0 = # proc_state { send_fun = SendFun } ) ->
202208 rabbit_log_connection :debug (" Received an UNSUBSCRIBE for topic(s) ~p " , [Topics ]),
203209 PState = lists :foldl (
204210 fun (# mqtt_topic {name = TopicName },
@@ -218,15 +224,15 @@ process_request(?UNSUBSCRIBE,
218224 S0
219225 end
220226 end , PState0 , Topics ),
221- serialise_and_send_to_client (
227+ SendFun (
222228 # mqtt_frame {fixed = # mqtt_frame_fixed {type = ? UNSUBACK },
223229 variable = # mqtt_frame_suback {message_id = MessageId }},
224230 PState ),
225231 {ok , PState };
226232
227- process_request (? PINGREQ , # mqtt_frame {}, PState ) ->
233+ process_request (? PINGREQ , # mqtt_frame {}, PState = # proc_state { send_fun = SendFun } ) ->
228234 rabbit_log_connection :debug (" Received a PINGREQ" ),
229- serialise_and_send_to_client (
235+ SendFun (
230236 # mqtt_frame {fixed = # mqtt_frame_fixed {type = ? PINGRESP }},
231237 PState ),
232238 rabbit_log_connection :debug (" Sent a PINGRESP" ),
@@ -243,7 +249,7 @@ process_connect(#mqtt_frame{
243249 clean_sess = CleanSess ,
244250 client_id = ClientId ,
245251 keep_alive = Keepalive } = FrameConnect },
246- PState0 ) ->
252+ PState0 = # proc_state { send_fun = SendFun } ) ->
247253 rabbit_log_connection :debug (" Received a CONNECT, client ID: ~s , username: ~s , "
248254 " clean session: ~s , protocol version: ~p , keepalive: ~p " ,
249255 [ClientId , Username , CleanSess , ProtoVersion , Keepalive ]),
@@ -265,7 +271,7 @@ process_connect(#mqtt_frame{
265271 variable = # mqtt_frame_connack {
266272 session_present = SessionPresent ,
267273 return_code = ReturnCode }},
268- serialise_and_send_to_client (ResponseFrame , PState ),
274+ SendFun (ResponseFrame , PState ),
269275 return_connack (ReturnCode , PState ).
270276
271277client_id ([]) ->
@@ -489,7 +495,8 @@ hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
489495
490496maybe_send_retained_message (RPid , # mqtt_topic {name = Topic0 , qos = SubscribeQos },
491497 # proc_state {amqp2mqtt_fun = Amqp2MqttFun ,
492- packet_id = PacketId0 } = PState0 ) ->
498+ packet_id = PacketId0 ,
499+ send_fun = SendFun } = PState0 ) ->
493500 Topic1 = Amqp2MqttFun (Topic0 ),
494501 case rabbit_mqtt_retainer :fetch (RPid , Topic1 ) of
495502 undefined ->
@@ -502,7 +509,7 @@ maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}
502509 ? QOS_1 ->
503510 {PacketId0 , PState0 # proc_state {packet_id = increment_packet_id (PacketId0 )}}
504511 end ,
505- serialise_and_send_to_client (
512+ SendFun (
506513 # mqtt_frame {fixed = # mqtt_frame_fixed {
507514 type = ? PUBLISH ,
508515 qos = Qos ,
@@ -1123,9 +1130,9 @@ send_puback(MsgIds, PState)
11231130 lists :foreach (fun (Id ) ->
11241131 send_puback (Id , PState )
11251132 end , MsgIds );
1126- send_puback (MsgId , PState ) ->
1133+ send_puback (MsgId , PState = # proc_state { send_fun = SendFun } ) ->
11271134 rabbit_global_counters :messages_confirmed (mqtt , 1 ),
1128- serialise_and_send_to_client (
1135+ SendFun (
11291136 # mqtt_frame {fixed = # mqtt_frame_fixed {type = ? PUBACK },
11301137 variable = # mqtt_frame_publish {message_id = MsgId }},
11311138 PState ).
@@ -1141,6 +1148,9 @@ serialise_and_send_to_client(Frame, #proc_state{proto_ver = ProtoVer, socket = S
11411148 [Sock , Error , Frame ])
11421149 end .
11431150
1151+ serialise (Frame , # proc_state {proto_ver = ProtoVer }) ->
1152+ rabbit_mqtt_frame :serialise (Frame , ProtoVer ).
1153+
11441154terminate (# proc_state {client_id = undefined }) ->
11451155 ok ;
11461156terminate (# proc_state {client_id = ClientId }) ->
@@ -1273,7 +1283,8 @@ maybe_publish_to_client(
12731283 # basic_message {
12741284 routing_keys = [RoutingKey | _CcRoutes ],
12751285 content = # content {payload_fragments_rev = FragmentsRev }}},
1276- QoS , PState0 = # proc_state {amqp2mqtt_fun = Amqp2MqttFun }) ->
1286+ QoS , PState0 = # proc_state {amqp2mqtt_fun = Amqp2MqttFun ,
1287+ send_fun = SendFun }) ->
12771288 {PacketId , PState } = queue_message_id_to_packet_id (QMsgId , QoS , PState0 ),
12781289 % %TODO support iolists when sending to client
12791290 Payload = list_to_binary (lists :reverse (FragmentsRev )),
@@ -1293,7 +1304,7 @@ maybe_publish_to_client(
12931304 message_id = PacketId ,
12941305 topic_name = Amqp2MqttFun (RoutingKey )},
12951306 payload = Payload },
1296- serialise_and_send_to_client (Frame , PState ),
1307+ SendFun (Frame , PState ),
12971308 PState .
12981309
12991310queue_message_id_to_packet_id (_ , ? QOS_0 , PState ) ->
0 commit comments