@@ -64,6 +64,10 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState})
6464 {Handler , HandlerState # state {socket = Sock }}).
6565
6666% % cowboy_websocket
67+ -spec init (Req , any ()) ->
68+ {ok | module (), Req , any ()} |
69+ {module (), Req , any (), any ()}
70+ when Req :: cowboy_req :req ().
6771init (Req , Opts ) ->
6872 {PeerAddr , _PeerPort } = maps :get (peer , Req ),
6973 SockInfo = maps :get (proxy_header , Req , undefined ),
@@ -87,6 +91,9 @@ init(Req, Opts) ->
8791 received_connect_frame = false
8892 }, WsOpts }.
8993
94+ -spec websocket_init (State ) ->
95+ {cowboy_websocket :commands (), State } |
96+ {cowboy_websocket :commands (), State , hibernate }.
9097websocket_init (State0 = # state {socket = Sock , peername = PeerAddr }) ->
9198 ok = file_handle_cache :obtain (),
9299 case rabbit_net :connection_string (Sock , inbound ) of
@@ -102,13 +109,13 @@ websocket_init(State0 = #state{socket = Sock, peername = PeerAddr}) ->
102109 fun send_reply /2 ,
103110 PeerAddr ),
104111 process_flag (trap_exit , true ),
105- {ok ,
112+ {[] ,
106113 rabbit_event :init_stats_timer (
107114 State # state {proc_state = ProcessorState },
108115 # state .stats_timer ),
109116 hibernate };
110- _ ->
111- {stop , State0 }
117+ { error , Reason } ->
118+ {[{ shutdown_reason , Reason }] , State0 }
112119 end .
113120
114121-spec close_connection (pid (), string ()) -> 'ok' .
@@ -118,24 +125,30 @@ close_connection(Pid, Reason) ->
118125 sys :terminate (Pid , Reason ),
119126 ok .
120127
128+ -spec websocket_handle (ping | pong | {text | binary | ping | pong , binary ()}, State ) ->
129+ {cowboy_websocket :commands (), State } |
130+ {cowboy_websocket :commands (), State , hibernate }.
121131websocket_handle ({binary , Data }, State ) ->
122132 handle_data (Data , State );
123133% % Silently ignore ping and pong frames as Cowboy will automatically reply to ping frames.
124134websocket_handle ({Ping , _ }, State )
125135 when Ping =:= ping orelse Ping =:= pong ->
126- {ok , State , hibernate };
136+ {[] , State , hibernate };
127137websocket_handle (Ping , State )
128138 when Ping =:= ping orelse Ping =:= pong ->
129- {ok , State , hibernate };
139+ {[] , State , hibernate };
130140% % Log any other unexpected frames.
131141websocket_handle (Frame , State ) ->
132142 rabbit_log_connection :info (" Web MQTT: unexpected WebSocket frame ~p " ,
133143 [Frame ]),
134144 % %TODO close connection instead?
135145 % %"MQTT Control Packets MUST be sent in WebSocket binary data frames.
136146 % % If any other type of data frame is received the recipient MUST close the Network Connection"
137- {ok , State , hibernate }.
147+ {[] , State , hibernate }.
138148
149+ -spec websocket_info (any (), State ) ->
150+ {cowboy_websocket :commands (), State } |
151+ {cowboy_websocket :commands (), State , hibernate }.
139152websocket_info ({conserve_resources , Conserve }, State ) ->
140153 NewState = State # state {conserve_resources = Conserve },
141154 handle_credits (control_throttle (NewState ));
@@ -144,14 +157,14 @@ websocket_info({bump_credit, Msg}, State) ->
144157 handle_credits (control_throttle (State ));
145158 % %TODO return hibernate?
146159websocket_info ({reply , Data }, State ) ->
147- {reply , {binary , Data }, State , hibernate };
160+ {[ {binary , Data }] , State , hibernate };
148161websocket_info ({'EXIT' , _ , _ }, State ) ->
149162 stop (State );
150163websocket_info ({'$gen_cast' , QueueEvent = {queue_event , _ , _ }},
151164 State = # state {proc_state = PState0 }) ->
152165 case rabbit_mqtt_processor :handle_queue_event (QueueEvent , PState0 ) of
153166 {ok , PState } ->
154- {ok , State # state {proc_state = PState }, hibernate };
167+ {[] , State # state {proc_state = PState }, hibernate };
155168 {error , Reason , PState } ->
156169 rabbit_log_connection :error (" Web MQTT connection ~p failed to handle queue event: ~p " ,
157170 [State # state .conn_name , Reason ]),
@@ -171,7 +184,7 @@ websocket_info({keepalive, Req}, State = #state{keepalive = KState0,
171184 conn_name = ConnName }) ->
172185 case rabbit_mqtt_keepalive :handle (Req , KState0 ) of
173186 {ok , KState } ->
174- {ok , State # state {keepalive = KState }, hibernate };
187+ {[] , State # state {keepalive = KState }, hibernate };
175188 {error , timeout } ->
176189 rabbit_log_connection :error (" keepalive timeout in Web MQTT connection ~p " ,
177190 [ConnName ]),
@@ -182,15 +195,16 @@ websocket_info({keepalive, Req}, State = #state{keepalive = KState0,
182195 stop (State )
183196 end ;
184197websocket_info (emit_stats , State ) ->
185- {ok , emit_stats (State ), hibernate };
198+ {[] , emit_stats (State ), hibernate };
186199websocket_info ({ra_event , _From , Evt },
187200 # state {proc_state = PState0 } = State ) ->
188201 PState = rabbit_mqtt_processor :handle_ra_event (Evt , PState0 ),
189- {ok , State # state {proc_state = PState }, hibernate };
202+ {[] , State # state {proc_state = PState }, hibernate };
190203websocket_info (Msg , State ) ->
191204 rabbit_log_connection :warning (" Web MQTT: unexpected message ~p " , [Msg ]),
192- {ok , State , hibernate }.
205+ {[] , State , hibernate }.
193206
207+ -spec terminate (any (), cowboy_req :req (), any ()) -> ok .
194208terminate (_Reason , _Request ,
195209 # state {conn_name = ConnName ,
196210 proc_state = PState ,
@@ -274,7 +288,7 @@ handle_credits(State0) ->
274288 State = # state {state = running } ->
275289 {[{active , true }], State };
276290 State ->
277- {ok , State }
291+ {[] , State }
278292 end .
279293
280294control_throttle (State = # state {state = CS ,
@@ -297,6 +311,7 @@ send_reply(Frame, PState) ->
297311ensure_stats_timer (State ) ->
298312 rabbit_event :ensure_stats_timer (State , # state .stats_timer , emit_stats ).
299313
314+ % % TODO if #state.stats_timer is undefined, rabbit_event:if_enabled crashes
300315maybe_emit_stats (State ) ->
301316 rabbit_event :if_enabled (State , # state .stats_timer ,
302317 fun () -> emit_stats (State ) end ).
0 commit comments