22
33use Exception ;
44use Mookofe \Tail \BaseOptions ;
5- use PhpAmqpLib \Connection \AMQPConnection ;
5+ use PhpAmqpLib \Connection \AMQPSSLConnection ;
66
77/**
88 * Connection class, used to manage connection to the RabbitMQ Server
@@ -46,6 +46,41 @@ class Connection extends BaseOptions{
4646 */
4747 public $ consumer_tag ;
4848
49+ /**
50+ * RabbitMQ connection SSL context options
51+ *
52+ * @var array
53+ */
54+ public $ ssl_context_options ;
55+
56+ /**
57+ * RabbitMQ connection timeout in seconds
58+ *
59+ * @var float
60+ */
61+ public $ connection_timeout ;
62+
63+ /**
64+ * RabbitMQ connection read/write timeout in seconds
65+ *
66+ * @var float
67+ */
68+ public $ read_write_timeout ;
69+
70+ /**
71+ * RabbitMQ connection keepalive flag
72+ *
73+ * @var bool
74+ */
75+ public $ keepalive ;
76+
77+ /**
78+ * RabbitMQ connection heartbeat in seconds
79+ *
80+ * @var int
81+ */
82+ public $ heartbeat ;
83+
4984 /**
5085 * RabbitMQ AMQP Connection
5186 *
@@ -69,7 +104,19 @@ class Connection extends BaseOptions{
69104 */
70105 public function __construct (array $ options = null )
71106 {
72- $ this ->allowedOptions = array_merge ($ this ->allowedOptions , array ('host ' , 'port ' , 'username ' , 'password ' , 'consumer_tag ' ));
107+ $ this ->allowedOptions = array_merge ($ this ->allowedOptions , array (
108+ 'host ' ,
109+ 'port ' ,
110+ 'username ' ,
111+ 'password ' ,
112+ 'consumer_tag ' ,
113+ 'ssl_context_options ' ,
114+ 'connection_timeout ' ,
115+ 'read_write_timeout ' ,
116+ 'keepalive ' ,
117+ 'heartbeat '
118+ )
119+ );
73120
74121 if (!$ options )
75122 $ options = $ this ->buildConnectionOptions ();
@@ -86,7 +133,21 @@ public function open()
86133 {
87134 try
88135 {
89- $ this ->AMQPConnection = new AMQPConnection ($ this ->host , $ this ->port , $ this ->username , $ this ->password , $ this ->vhost );
136+ $ additionalConnectionOptions = array ();
137+ foreach (array ('connection_timeout ' , 'read_write_timeout ' , 'keepalive ' , 'heartbeat ' ) as $ option ) {
138+ if (isset ($ this ->$ option )) {
139+ $ additionalConnectionOptions [$ option ] = $ this ->$ option ;
140+ }
141+ }
142+ $ this ->AMQPConnection = new AMQPSSLConnection (
143+ $ this ->host ,
144+ $ this ->port ,
145+ $ this ->username ,
146+ $ this ->password ,
147+ $ this ->vhost ,
148+ $ this ->ssl_context_options ,
149+ $ additionalConnectionOptions
150+ );
90151 $ this ->channel = $ this ->AMQPConnection ->channel ();
91152 $ this ->channel ->queue_declare ($ this ->queue_name , false , false , false , false );
92153 $ this ->channel ->exchange_declare ($ this ->exchange , $ this ->exchange_type , false , true , false );
0 commit comments