@@ -48,7 +48,7 @@ local _M = { _VERSION = '0.1' }
48
48
local mt = { __index = _M }
49
49
_M .SOCKET_TYPE = SOCKET_TYPE
50
50
51
- ffi .cdef [[
51
+ ffi .cdef [[
52
52
typedef struct _zctx_t zctx_t ;
53
53
extern volatile int zctx_interrupted;
54
54
zctx_t * zctx_new (void );
@@ -63,6 +63,8 @@ ffi.cdef[[
63
63
int zstr_send (void * socket , const char * string );
64
64
65
65
int zmq_ctx_destroy (void * context );
66
+
67
+ void zmq_version (int * major , int * minor , int * patch );
66
68
]]
67
69
68
70
@@ -78,7 +80,9 @@ local function check_worker_process(premature)
78
80
ngx .log (ngx .ERR , " failed to create timer to check worker process: " , err )
79
81
end
80
82
else
81
- ngx .log (ngx .INFO , " Terminating ZMQ context due to worker termination ..." )
83
+ local ngx_worker_pid_msg = " worker pid=" .. tostring (ngx .worker .pid ())
84
+
85
+ ngx .log (ngx .INFO , " Terminating ZMQ context due to worker termination; " , ngx_worker_pid_msg )
82
86
-- this should be called when the worker is stopped
83
87
zmqlib .zmq_ctx_destroy (ctx )
84
88
end
@@ -94,23 +98,73 @@ function _M.new(self)
94
98
end
95
99
96
100
function _M .connect (self , socket_type , socket_address )
97
- if ( socket_type == nil ) then
98
- error (" Socket type must be provided." )
101
+ if (socket_type == nil ) then
102
+ error (" Socket type must be provided." )
99
103
end
100
- if ( socket_address == nil ) then
104
+
105
+ if (socket_address == nil ) then
101
106
error (" Socket address must be provided." )
102
107
end
108
+
109
+ local intPtr = ffi .typeof (" int[1]" )
110
+ local zmq_version_major_holder = intPtr ()
111
+ local zmq_version_minor_holder = intPtr ()
112
+ local zmq_version_patch_holder = intPtr ()
113
+
114
+ czmq .zmq_version (zmq_version_major_holder , zmq_version_minor_holder , zmq_version_patch_holder )
115
+
116
+ -- Dereference the pointers
117
+ -- noinspection ArrayElementZero
118
+ local major = tostring (zmq_version_major_holder [0 ]);
119
+ -- noinspection ArrayElementZero
120
+ local minor = tostring (zmq_version_minor_holder [0 ]);
121
+ -- noinspection ArrayElementZero
122
+ local patch = tostring (zmq_version_patch_holder [0 ]);
123
+
124
+ ngx .log (ngx .INFO , " Using ZeroMQ " , major , " ." , minor , " ." , patch )
125
+
103
126
self .socketInst = czmq .zsocket_new (ctx , socket_type )
104
- local socket_bound = czmq .zsocket_connect (self .socketInst , socket_address )
127
+
128
+ local ngx_worker_pid_msg = " worker pid=" .. tostring (ngx .worker .pid ())
129
+ local socket_address_msg = " socket_address=" .. socket_address
130
+ local pid_and_address_msg = ngx_worker_pid_msg .. " , " .. socket_address_msg
131
+
132
+ if (self .socketInst == nil ) then
133
+ error (" Socket could not be created; " .. pid_and_address_msg )
134
+ end
135
+
136
+ local zsocket_connect_result = czmq .zsocket_connect (self .socketInst , socket_address )
137
+ local zsocket_connect_result_msg = " zsocket_connect result=" .. tostring (zsocket_connect_result )
138
+ local pid_and_result_msg = ngx_worker_pid_msg .. " , " .. zsocket_connect_result_msg
139
+
140
+ if (zsocket_connect_result == 0 ) then
141
+ ngx .log (ngx .INFO , " Connected socket; " , pid_and_result_msg )
142
+ elseif (zsocket_connect_result == - 1 ) then
143
+ ngx .log (ngx .ERR , " Could not connect socket; " , pid_and_result_msg )
144
+ else
145
+ error (" Unexpected result while attempting to connect to [" .. socket_address_msg .. " ]; " .. pid_and_result_msg )
146
+ end
105
147
end
106
148
107
149
function _M .log (self , msg )
108
- local send_result = " NOT-SENT"
109
- if ( msg ~= nil and # msg > 0 ) then
110
- send_result = czmq .zstr_send (self .socketInst , msg )
150
+ local ngx_worker_pid_msg = " worker pid=" .. tostring (ngx .worker .pid ())
151
+
152
+ if (msg == nil or # msg == 0 ) then
153
+ ngx .log (ngx .WARN , " Nothing to send; " , ngx_worker_pid_msg )
154
+ return
111
155
end
112
156
113
- ngx .log (ngx .DEBUG , " Message [" , tostring (msg ), " ], sent with result=" , tostring (send_result ), " , from pid=" , ngx .worker .pid () )
157
+ local zstr_send_result = czmq .zstr_send (self .socketInst , msg )
158
+ local zstr_send_result_msg = " zstr_send result=" .. tostring (zstr_send_result )
159
+ local pid_and_result_msg = ngx_worker_pid_msg .. " , " .. zstr_send_result_msg
160
+
161
+ if (zstr_send_result == 0 ) then
162
+ ngx .log (ngx .DEBUG , " Message [" , msg , " ] has been sent; " , pid_and_result_msg )
163
+ elseif (zstr_send_result == - 1 ) then
164
+ ngx .log (ngx .ERR , " Message [" , msg , " ] could not be sent; " , pid_and_result_msg )
165
+ else
166
+ error (" Unexpected result while attempting to send message [" .. msg .. " ]; " .. pid_and_result_msg )
167
+ end
114
168
end
115
169
116
170
function _M .disconnect (self )
122
176
-- wait for the connection to complete then send the message, otherwise it is not received
123
177
-- ngx.sleep(0.100)
124
178
125
- return _M
179
+ return _M
0 commit comments