Skip to content

Commit

Permalink
Update to CZMQ v. 4.0 API
Browse files Browse the repository at this point in the history
Update to CZMQ v. 4.0 API in preparation for Stretch.  Implementation
follows the [zsock manual][1] and [CZMQ 4.0.0 NEWS][2].

Fixes #1152 [3].

[1]: http://czmq.zeromq.org/manual:zsock
[2]: zeromq/czmq@052d2504
[3]: machinekit/machinekit#1152
  • Loading branch information
zultron authored and ArcEye committed Oct 7, 2017
1 parent dbf37ca commit afa2892
Show file tree
Hide file tree
Showing 21 changed files with 121 additions and 135 deletions.
2 changes: 1 addition & 1 deletion debian/control.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Build-Depends: debhelper (>= 6),
libxmu-headers, python (>= 2.6.6-3~), python-dev (>= 2.6.6-3~),
cython (>= 0.19), dh-python,
pkg-config, psmisc, python-tk, libxaw7-dev, libboost-serialization-dev,
libzmq3-dev (>= 4.0.4), libczmq-dev (>= 2.2.0), libjansson-dev (>= 2.5),
libzmq3-dev (>= 4.0.4), libczmq-dev (>= 4.0.0), libjansson-dev (>= 2.5),
libwebsockets-dev (>= 1.2.2),
python-zmq (>= 14.0.1), procps,
liburiparser-dev, libssl-dev, python-setuptools,
Expand Down
2 changes: 1 addition & 1 deletion src/configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ AC_ARG_ENABLE(dev,
# NML
##############################################################################

PKG_CHECK_MODULES([CZMQ], [libczmq > 2.0],
PKG_CHECK_MODULES([CZMQ], [libczmq > 4.0],
[
AC_DEFINE(HAVE_CZMQ, [], [zeroMQ czmq library available])
USE_CZMQ=yes
Expand Down
25 changes: 11 additions & 14 deletions src/emc/rs274ngc/previewmodule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ using namespace google::protobuf;
#include "czmq.h"
#include "pbutil.hh" // hal/haltalk

static zctx_t *z_context;
static void *z_preview, *z_status; // sockets
static zsock_t *z_preview, *z_status;
static const char *istat_topic = "status";
static int batch_limit = 100;
static const char *p_client = "preview"; //NULL; // single client for now
Expand Down Expand Up @@ -104,9 +103,6 @@ static void preview_end()

static int z_init(void)
{
if (!z_context)
z_context = zctx_new ();

// const char *uri = getenv("PREVIEW_URI");
// if (uri) z_preview_uri = uri;
// uri = getenv("STATUS_URI");
Expand All @@ -120,16 +116,16 @@ static int z_init(void)
GOOGLE_PROTOBUF_VERIFY_VERSION;


z_preview = zsocket_new (z_context, ZMQ_XPUB);
z_preview = zsock_new (ZMQ_XPUB);
#if 0
rc = zsocket_bind(z_preview, z_preview_uri);
rc = zsock_bind(z_preview, z_preview_uri);
assert (rc != 0);
#endif

z_status = zsocket_new (z_context, ZMQ_XPUB);
z_status = zsock_new (ZMQ_XPUB);
assert(z_status);
#if 0
rc = zsocket_bind(z_status, z_status_uri);
rc = zsock_bind(z_status, z_status_uri);
assert (rc != 0);

#endif
Expand All @@ -149,7 +145,8 @@ static void z_shutdown(void)
fprintf(stderr, "preview: %zu containers %zu preview msgs %zu bytes avg=%zu bytes/container\n",
n_containers, n_messages, n_bytes, n_bytes/n_containers);
}
zctx_destroy(&z_context);
zsock_destroy(&z_preview);
zsock_destroy(&z_status);
}

char _parameter_file_name[LINELEN];
Expand Down Expand Up @@ -1306,13 +1303,13 @@ static PyObject *bind_sockets(PyObject *self, PyObject *args) {
if(!PyArg_ParseTuple(args, "ss", &preview_uri, &status_uri))
return NULL;
int rc;
rc = zsocket_bind(z_preview, "%s", preview_uri);
rc = zsock_bind(z_preview, "%s", preview_uri);
if(!rc) {
PyErr_Format(PyExc_RuntimeError,
"binding preview socket to '%s' failed", preview_uri);
return NULL;
}
rc = zsocket_bind(z_status, "%s", status_uri);
rc = zsock_bind(z_status, "%s", status_uri);
if(!rc) {
PyErr_Format(PyExc_RuntimeError,
"binding status socket to '%s' failed", status_uri);
Expand All @@ -1321,8 +1318,8 @@ static PyObject *bind_sockets(PyObject *self, PyObject *args) {
// usleep(300 *1000); // avoid slow joiner syndrome

return Py_BuildValue("(ss)",
zsocket_last_endpoint(z_preview),
zsocket_last_endpoint(z_status));
zsock_last_endpoint(z_preview),
zsock_last_endpoint(z_status));
}

static PyMethodDef gcode_methods[] = {
Expand Down
15 changes: 6 additions & 9 deletions src/hal/utils/halcmd_rtapiapp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ using namespace google::protobuf;

static machinetalk::Container command, reply;

static zctx_t *z_context;
static void *z_command;
static zsock_t *z_command;
static int timeout = 5000;
static std::string errormsg;
int proto_debug;
Expand Down Expand Up @@ -279,22 +278,20 @@ int rtapi_connect(int instance, char *uri, const char *svc_uuid)
}
#endif

z_context = zctx_new ();
assert(z_context);
z_command = zsocket_new (z_context, ZMQ_DEALER);
z_command = zsock_new (ZMQ_DEALER);
assert(z_command);

char z_ident[30];
snprintf(z_ident, sizeof(z_ident), "halcmd%d",getpid());

zsocket_set_identity(z_command, z_ident);
zsocket_set_linger(z_command, 0);
zsock_set_identity(z_command, z_ident);
zsock_set_linger(z_command, 0);

if (zsocket_connect(z_command, "%s", uri)) {
if (zsock_connect(z_command, "%s", uri)) {
perror("connect");
return -EINVAL;
}
zsocket_set_rcvtimeo (z_command, timeout * ZMQ_POLL_MSEC);
zsock_set_rcvtimeo (z_command, timeout * ZMQ_POLL_MSEC);

return rtapi_ping(instance);
}
8 changes: 4 additions & 4 deletions src/machinetalk/haltalk/haltalk_bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ static int bridge_fsm(htself_t *self, bridgeevent_t event)
case BSTATE_RETRY_PROBE:

case BSTATE_CONNECT:
bridge->z_bridge = zsocket_new (self->z_context, ZMQ_XSUB);
retval = zsocket_connect(bridge->z_bridge, self->cfg->bridgecomp_updateuri);
bridge->z_bridge = zsock_new (self->z_context, ZMQ_XSUB);
retval = zsock_connect(bridge->z_bridge, self->cfg->bridgecomp_updateuri);
assert (retval == 0);
bridge->z_bridge_cmd = zsocket_new (self->z_context, ZMQ_DEALER);
retval = zsocket_connect(bridge->z_bridge_cmd, self->cfg->bridgecomp_cmduri);
bridge->z_bridge_cmd = zsock_new (self->z_context, ZMQ_DEALER);
retval = zsock_connect(bridge->z_bridge_cmd, self->cfg->bridgecomp_cmduri);
assert (retval == 0);
break;

Expand Down
29 changes: 13 additions & 16 deletions src/machinetalk/haltalk/haltalk_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,13 @@ zmq_init(htself_t *self)
assert(self->signal_fd > -1);
}

// suppress default handling of signals in zctx_new()
// suppress default handling of signals in zsock_new()
// since we're using signalfd()
// must happen before zctx_new()
// must happen before zsock_new()
zsys_handler_set(NULL);

mk_netopts_t *np = &self->netopts;

np->z_context = zctx_new ();
assert(np->z_context);

np->z_loop = zloop_new();
assert (np->z_loop);

Expand All @@ -175,10 +172,10 @@ zmq_init(htself_t *self)
mk_socket_t *ms = &self->mksock[SVC_HALGROUP];
ms->dnssd_subtype = HALGROUP_DNSSD_SUBTYPE;
ms->tag = "halgroup";
ms->socket = zsocket_new (self->netopts.z_context, ZMQ_XPUB);
ms->socket = zsock_new (ZMQ_XPUB);
assert(ms->socket);
zsocket_set_linger(ms->socket, 0);
zsocket_set_xpub_verbose(ms->socket, 1);
zsock_set_linger(ms->socket, 0);
zsock_set_xpub_verbose(ms->socket, 1);
if (mk_bindsocket(np, ms))
return -1;
assert(ms->port > -1);
Expand All @@ -191,10 +188,10 @@ zmq_init(htself_t *self)
ms = &self->mksock[SVC_HALRCOMP];
ms->dnssd_subtype = HALRCOMP_DNSSD_SUBTYPE;
ms->tag = "halrcomp";
ms->socket = zsocket_new (self->netopts.z_context, ZMQ_XPUB);
ms->socket = zsock_new (ZMQ_XPUB);
assert(ms->socket);
zsocket_set_linger(ms->socket, 0);
zsocket_set_xpub_verbose(ms->socket, 1);
zsock_set_linger(ms->socket, 0);
zsock_set_xpub_verbose(ms->socket, 1);
if (mk_bindsocket(np, ms))
return -1;
assert(ms->port > -1);
Expand All @@ -207,10 +204,10 @@ zmq_init(htself_t *self)
ms = &self->mksock[SVC_HALRCMD];
ms->dnssd_subtype = HALRCMD_DNSSD_SUBTYPE;
ms->tag = "halrcmd";
ms->socket = zsocket_new (self->netopts.z_context, ZMQ_ROUTER);
ms->socket = zsock_new (ZMQ_ROUTER);
assert(ms->socket);
zsocket_set_linger(ms->socket, 0);
zsocket_set_identity (ms->socket, self->cfg->modname);
zsock_set_linger(ms->socket, 0);
zsock_set_identity (ms->socket, self->cfg->modname);
if (mk_bindsocket(np, ms))
return -1;
assert(ms->port > -1);
Expand Down Expand Up @@ -469,8 +466,8 @@ int main (int argc, char *argv[])
ht_zeroconf_withdraw(&self);
// probably should run zloop here until deregister complete

// shutdown zmq context
zctx_destroy(&self.netopts.z_context);
// shutdown zmq socket
zsock_destroy(&self.mksock[SVC_HALGROUP].socket);

hal_cleanup(&self);

Expand Down
3 changes: 1 addition & 2 deletions src/machinetalk/include/mk-service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ typedef struct {
// these must be set by caller:
const char *rundir; // for IPC sockets
int rtapi_instance; // defaults to 0
zctx_t *z_context;
zloop_t *z_loop;
AvahiCzmqPoll *av_loop; // Avahi CZMQ event loop adapter

Expand Down Expand Up @@ -47,7 +46,7 @@ typedef struct {
typedef struct {
int port; // otpionally may be set by caller

void *socket; // must be set by caller
zsock_t *socket; // must be set by caller
const char *dnssd_subtype; // must be set by caller
const char *tag; // must be set by caller

Expand Down
8 changes: 4 additions & 4 deletions src/machinetalk/lib/mk_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static int bind_ifs(mk_socket_t *s, const argvec_t &ifs)
uri = "tcp://" + ifs[i] + ":" + boost::lexical_cast<std::string>(s->port);
}
// use this port number for the rest of the ifs
s->port = zsocket_bind(s->socket, "%s", uri.c_str());
s->port = zsock_bind(s->socket, "%s", uri.c_str());
if (s->port < 0) {
syslog_async(LOG_ERR, "bind to '%s' failed: %s",
uri.c_str(), strerror(errno));
Expand Down Expand Up @@ -153,8 +153,8 @@ int mk_bindsocket(mk_netopts_t *n, mk_socket_t *s)
// if there are any V6 interfaces/addresses,
// enable V6 on socket now
if (ifs.size() > 0) {
zsocket_set_ipv6 (s->socket, 1);
assert (zsocket_ipv6 (s->socket) == 1);
zsock_set_ipv6 (s->socket, 1);
assert (zsock_ipv6 (s->socket) == 1);
}
// and bind them all
retval = bind_ifs(s, ifs);
Expand All @@ -166,7 +166,7 @@ int mk_bindsocket(mk_netopts_t *n, mk_socket_t *s)
// use IPC sockets
snprintf(buf, sizeof(buf), ZMQIPC_FORMAT,
n->rundir, n->rtapi_instance, s->tag, n->service_uuid);
s->port = zsocket_bind(s->socket, "%s", buf);
s->port = zsock_bind(s->socket, "%s", buf);
if (s->port < 0)
syslog_async(LOG_ERR, "bind(%s): %s\n", buf, strerror(errno));

Expand Down
36 changes: 18 additions & 18 deletions src/machinetalk/messagebus/messagebus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ typedef struct {
actormap_t *cmd_subscribers;
actormap_t *response_subscribers;
int comp_id;
zctx_t *context;
zsock_t *context;
zloop_t *loop;
register_context_t *command_publisher;
register_context_t *response_publisher;
Expand Down Expand Up @@ -266,33 +266,33 @@ static int zmq_setup(msgbusd_self_t *self)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;

// suppress default handling of signals in zctx_new()
// suppress default handling of signals in zsock_new()
// since we're using signalfd()
zsys_handler_set(NULL);

self->context = zctx_new ();
self->context = zsock_new ();
assert(self->context);

zctx_set_linger (self->context, 0);
zsock_set_linger (self->context, 0);

self->cmd = zsocket_new (self->context, ZMQ_XPUB);
self->cmd = zsock_new (self->context, ZMQ_XPUB);
assert(self->cmd);
zsocket_set_xpub_verbose (self->cmd, 1);
self->command_port = zsocket_bind(self->cmd, self->cmd_uri);
zsock_set_xpub_verbose (self->cmd, 1);
self->command_port = zsock_bind(self->cmd, self->cmd_uri);
assert(self->command_port > -1);

self->command_dsn = zsocket_last_endpoint (self->cmd);
self->command_dsn = zsock_last_endpoint (self->cmd);

assert(zsocket_bind(self->cmd, proxy_cmd_uri) > -1);
assert(zsock_bind(self->cmd, proxy_cmd_uri) > -1);

self->response = zsocket_new (self->context, ZMQ_XPUB);
self->response = zsock_new (self->context, ZMQ_XPUB);
assert(self->response);
zsocket_set_xpub_verbose (self->response, 1);
self->response_port = zsocket_bind(self->response, self->response_uri);
zsock_set_xpub_verbose (self->response, 1);
self->response_port = zsock_bind(self->response, self->response_uri);
assert(self->response_port > -1);
self->response_dsn = zsocket_last_endpoint (self->response);
self->response_dsn = zsock_last_endpoint (self->response);

assert(zsocket_bind(self->response, proxy_response_uri) > -1);
assert(zsock_bind(self->response, proxy_response_uri) > -1);

usleep(200 *1000); // avoid slow joiner syndrome

Expand Down Expand Up @@ -398,7 +398,7 @@ static int rtproxy_setup(msgbusd_self_t *self)
{
echo.flags = ACTOR_ECHO|TRACE_TO_RT;
echo.name = "echo";
echo.pipe = zthread_fork (self->context, rtproxy_thread, &echo);
echo.pipe = zactor_fork (self->context, rtproxy_thread, &echo);
assert (echo.pipe);

demo.flags = ACTOR_RESPONDER|TRACE_FROM_RT|TRACE_TO_RT|DESERIALIZE_TO_RT|SERIALIZE_FROM_RT;
Expand All @@ -411,7 +411,7 @@ static int rtproxy_setup(msgbusd_self_t *self)
demo.from_rt_name = "mptx.0.out";
demo.min_delay = 2; // msec
demo.max_delay = 200; // msec
demo.pipe = zthread_fork (self->context, rtproxy_thread, &demo);
demo.pipe = zactor_fork (self->context, rtproxy_thread, &demo);
assert (demo.pipe);

// too.flags = ACTOR_RESPONDER|ACTOR_TRACE;
Expand All @@ -421,7 +421,7 @@ static int rtproxy_setup(msgbusd_self_t *self)
// too.name = "mptx";
// too.to_rt_name = "mptx.0.in";
// too.from_rt_name = "mptx.0.out";
// too.pipe = zthread_fork (self->context, rtproxy_thread, &too);
// too.pipe = zactor_fork (self->context, rtproxy_thread, &too);
// assert (too.pipe);

return 0;
Expand Down Expand Up @@ -668,7 +668,7 @@ int main (int argc, char *argv[])
mb_zeroconf_withdraw(&self);

// shutdown zmq context
zctx_destroy (&self.context);
zsock_destroy (&self.context);

if (comp_id)
hal_exit(comp_id);
Expand Down
10 changes: 5 additions & 5 deletions src/machinetalk/messagebus/rtproxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ send_subscribe(void *socket, const char *topic)
}

void
rtproxy_thread(void *arg, zctx_t *ctx, void *pipe)
rtproxy_thread(void *arg, zsock_t *ctx, void *pipe)
{
rtproxy_t *self = (rtproxy_t *) arg;
int retval;

self->proxy_cmd = zsocket_new (ctx, ZMQ_XSUB);
retval = zsocket_connect(self->proxy_cmd, proxy_cmd_uri);
self->proxy_cmd = zsock_new (ctx, ZMQ_XSUB);
retval = zsock_connect(self->proxy_cmd, proxy_cmd_uri);
assert(retval == 0);

self->proxy_response = zsocket_new (ctx, ZMQ_XSUB);
assert(zsocket_connect(self->proxy_response, proxy_response_uri) == 0);
self->proxy_response = zsock_new (ctx, ZMQ_XSUB);
assert(zsock_connect(self->proxy_response, proxy_response_uri) == 0);

if (self->flags & (ACTOR_RESPONDER|ACTOR_ECHO|ACTOR_SUBSCRIBER)) {
retval = send_subscribe(self->proxy_cmd, self->name);
Expand Down
2 changes: 1 addition & 1 deletion src/machinetalk/messagebus/rtproxy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ typedef struct {
} rtproxy_t;


void rtproxy_thread(void *arg, zctx_t *ctx, void *pipe);
void rtproxy_thread(void *arg, zsock_t *ctx, void *pipe);
2 changes: 1 addition & 1 deletion src/machinetalk/webtalk/webtalk.hh
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ int wt_zeroconf_announce(wtself_t *self);
int wt_zeroconf_withdraw(wtself_t *self);

// webtalk_echo.cc:
void echo_thread(void *args, zctx_t *ctx, void *pipe);
void echo_thread(void *args, zsock_t *ctx, void *pipe);

// webtalk_proxy.cc:
int callback_http(
Expand Down
Loading

0 comments on commit afa2892

Please sign in to comment.