|
29 | 29 | amqp10_client_connection:connection_config()) -> |
30 | 30 | {ok, socket()} | {error, any()}. |
31 | 31 | connect(Host, Port, #{ws_path := Path} = Opts) -> |
| 32 | + GunOpts0 = maps:get(ws_opts, Opts, #{}), |
| 33 | + HTTP2Opts = maps:get(http2_opts, GunOpts0, #{}), |
| 34 | + GunOpts1 = GunOpts0#{http2_opts => HTTP2Opts#{notify_settings_changed => true}}, |
32 | 35 | GunOpts = maps:merge(#{tcp_opts => [{nodelay, true}]}, |
33 | | - maps:get(ws_opts, Opts, #{})), |
| 36 | + GunOpts1), |
34 | 37 | maybe |
35 | 38 | {ok, _Started} ?= application:ensure_all_started(gun), |
36 | 39 | {ok, Pid} ?= gun:open(Host, Port, GunOpts), |
37 | 40 | MRef = monitor(process, Pid), |
38 | | - {ok, _HttpVsn} ?= gun:await_up(Pid, MRef), |
| 41 | + {ok, HttpVsn} ?= gun:await_up(Pid, MRef), |
| 42 | + ok ?= case HttpVsn of |
| 43 | + http -> |
| 44 | + ok; |
| 45 | + http2 -> |
| 46 | + receive |
| 47 | + {gun_notify, Pid, settings_changed, #{enable_connect_protocol := true}} -> |
| 48 | + ok |
| 49 | + after 5000 -> |
| 50 | + {error, {ws_enable_connect_protocol, timeout}} |
| 51 | + end |
| 52 | + end, |
39 | 53 | {ok, StreamRef} ?= ws_upgrade(Pid, Path), |
40 | 54 | {ok, {ws, Pid, StreamRef}} |
41 | 55 | end; |
@@ -97,5 +111,6 @@ close({tcp, Socket}) -> |
97 | 111 | gen_tcp:close(Socket); |
98 | 112 | close({ssl, Socket}) -> |
99 | 113 | ssl:close(Socket); |
100 | | -close({ws, Pid, _Ref}) -> |
| 114 | +close({ws, Pid, Ref}) -> |
| 115 | + gun:ws_send(Pid, Ref, close), |
101 | 116 | gun:shutdown(Pid). |
0 commit comments