Skip to content

Commit

Permalink
fix bug zmq4.1.x PUB msg to ZMTP 1.0 SUB svr
Browse files Browse the repository at this point in the history
  • Loading branch information
laplaceyang committed Dec 15, 2016
1 parent d23c22c commit e4e6af1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
34 changes: 13 additions & 21 deletions src/stream_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,10 +726,19 @@ int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
errno_assert (rc == 0);
}

if (subscription_required)
process_msg = &stream_engine_t::write_subscription_msg;
else
process_msg = &stream_engine_t::push_msg_to_session;
if (subscription_required) {
msg_t subscription;

// Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages.
int rc = subscription.init_size (1);
errno_assert (rc == 0);
*(unsigned char*) subscription.data () = 1;
rc = session->push_msg (&subscription);
errno_assert (rc == 0);
}

process_msg = &stream_engine_t::push_msg_to_session;

return 0;
}
Expand Down Expand Up @@ -905,23 +914,6 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
return rc;
}

int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
msg_t subscription;

// Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages.
int rc = subscription.init_size (1);
errno_assert (rc == 0);
*(unsigned char*) subscription.data () = 1;
rc = session->push_msg (&subscription);
if (rc == -1)
return -1;

process_msg = &stream_engine_t::push_msg_to_session;
return push_msg_to_session (msg_);
}

void zmq::stream_engine_t::error (error_reason_t reason)
{
if (options.raw_sock) {
Expand Down
2 changes: 0 additions & 2 deletions src/stream_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ namespace zmq

void mechanism_ready ();

int write_subscription_msg (msg_t *msg_);

size_t add_property (unsigned char *ptr,
const char *name, const void *value, size_t value_len);

Expand Down

0 comments on commit e4e6af1

Please sign in to comment.