Skip to content

Commit

Permalink
Merge pull request #1 from loladiro/master
Browse files Browse the repository at this point in the history
ZMQ event loop integration and build script
  • Loading branch information
stevengj committed Jul 23, 2013
2 parents f4708b0 + 035e73a commit ef127ca
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 34 deletions.
4 changes: 4 additions & 0 deletions deps/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
builds/
downloads/
src/
usr/
10 changes: 10 additions & 0 deletions deps/build.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using BinDeps

@BinDeps.setup

zmq = library_dependency("zmq", aliases = ["libzmq"])

provides(Sources,URI("http://download.zeromq.org/zeromq-3.2.3.tar.gz"),zmq)
provides(BuildProcess,Autotools(libtarget = "src/.libs/libzmq."*BinDeps.shlib_ext),zmq)

@BinDeps.install
91 changes: 57 additions & 34 deletions src/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

module ZMQ

using Base
import Base: convert, ref, get, bytestring, length, size, stride, similar, getindex, setindex!
using BinDeps
@BinDeps.load_dependencies

import Base: convert, ref, get, bytestring, length, size, stride, similar, getindex, setindex!, fd, wait

export
#Types
Expand All @@ -24,8 +26,8 @@ show(io, thiserr::StateError) = print(io, "ZMQ: ", thiserr.msg)

# Basic functions
function jl_zmq_error_str()
errno = ccall((:zmq_errno, :libzmq), Cint, ())
c_strerror = ccall ((:zmq_strerror, :libzmq), Ptr{Uint8}, (Cint,), errno)
errno = ccall((:zmq_errno, zmq), Cint, ())
c_strerror = ccall ((:zmq_strerror, zmq), Ptr{Uint8}, (Cint,), errno)
if c_strerror != C_NULL
strerror = bytestring(c_strerror)
return strerror
Expand All @@ -35,7 +37,7 @@ function jl_zmq_error_str()
end

const version = let major = zeros(Cint, 1), minor = zeros(Cint, 1), patch = zeros(Cint, 1)
ccall((:zmq_version, :libzmq), Void, (Ptr{Cint}, Ptr{Cint}, Ptr{Cint}), major, minor, patch)
ccall((:zmq_version, zmq), Void, (Ptr{Cint}, Ptr{Cint}, Ptr{Cint}), major, minor, patch)
VersionNumber(major[1], minor[1], patch[1])
end

Expand All @@ -54,7 +56,7 @@ type Socket

# ctx should be ::Context, but forward type references are not allowed
function Socket(ctx, typ::Integer)
p = ccall((:zmq_socket, :libzmq), Ptr{Void}, (Ptr{Void}, Cint), ctx.data, typ)
p = ccall((:zmq_socket, zmq), Ptr{Void}, (Ptr{Void}, Cint), ctx.data, typ)
if p == C_NULL
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -67,7 +69,7 @@ end

function close(socket::Socket)
if socket.data != C_NULL
rc = ccall((:zmq_close, :libzmq), Cint, (Ptr{Void},), socket.data)
rc = ccall((:zmq_close, zmq), Cint, (Ptr{Void},), socket.data)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -87,8 +89,8 @@ type Context
sockets::Vector{Socket}

function Context(n::Integer)
@v2only p = ccall((:zmq_init, :libzmq), Ptr{Void}, (Cint,), n)
@v3only p = ccall((:zmq_ctx_new, :libzmq), Ptr{Void}, ())
@v2only p = ccall((:zmq_init, zmq), Ptr{Void}, (Cint,), n)
@v3only p = ccall((:zmq_ctx_new, zmq), Ptr{Void}, ())
if p == C_NULL
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -104,8 +106,8 @@ function close(ctx::Context)
for s in ctx.sockets
close(s)
end
@v2only rc = ccall((:zmq_term, :libzmq), Cint, (Ptr{Void},), ctx.data)
@v3only rc = ccall((:zmq_ctx_destroy, :libzmq), Cint, (Ptr{Void},), ctx.data)
@v2only rc = ccall((:zmq_term, zmq), Cint, (Ptr{Void},), ctx.data)
@v3only rc = ccall((:zmq_ctx_destroy, zmq), Cint, (Ptr{Void},), ctx.data)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -116,15 +118,15 @@ term(ctx::Context) = close(ctx)

@v3only begin
function get(ctx::Context, option::Integer)
val = ccall((:zmq_ctx_get, :libzmq), Cint, (Ptr{Void}, Cint), ctx.data, option)
val = ccall((:zmq_ctx_get, zmq), Cint, (Ptr{Void}, Cint), ctx.data, option)
if val < 0
throw(StateError(jl_zmq_error_str()))
end
return val
end

function set(ctx::Context, option::Integer, value::Integer)
rc = ccall((:zmq_ctx_set, :libzmq), Cint, (Ptr{Void}, Cint, Cint), ctx.data, option, value)
rc = ccall((:zmq_ctx_set, zmq), Cint, (Ptr{Void}, Cint, Cint), ctx.data, option, value)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -134,17 +136,20 @@ end # end v3only

# Getting and setting socket options
# Socket options of integer type
let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1)
let u64p = zeros(Uint64, 1), i64p = zeros(Int64, 1), ip = zeros(Cint, 1), u32p = zeros(Uint32, 1), sz = zeros(Uint, 1),
pp = zeros(Ptr{Void},1)
opslist = {
(:set_affinity, :get_affinity, 4, u64p)
(nothing, :get_fd, 14, ip)
(:set_type, :get_type, 16, ip)
(:set_linger, :get_linger, 17, ip)
(:set_reconnect_ivl, :get_reconnect_ivl, 18, ip)
(:set_backlog, :get_backlog, 19, ip)
(:set_reconnect_ivl_max, :get_reconnect_ivl_max, 21, ip)
}

@unix_only opslist = vcat(opslist, (nothing, :get_fd, 14, ip))
@windows_only opslist = vcat(opslist, (nothing, :get_fd, 14, pp))

if version.major == 2
opslist = vcat(opslist, {
(:set_hwm, :get_hwm, 1, u64p)
Expand Down Expand Up @@ -189,7 +194,7 @@ for (fset, fget, k, p) in opslist
@eval global ($fset)
@eval function ($fset)(socket::Socket, option_val::Integer)
($p)[1] = option_val
rc = ccall((:zmq_setsockopt, :libzmq), Cint,
rc = ccall((:zmq_setsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Void}, Uint),
socket.data, $k, $p, sizeof(eltype($p)))
if rc != 0
Expand All @@ -201,7 +206,7 @@ for (fset, fget, k, p) in opslist
@eval global($fget)
@eval function ($fget)(socket::Socket)
($sz)[1] = sizeof(eltype($p))
rc = ccall((:zmq_getsockopt, :libzmq), Cint,
rc = ccall((:zmq_getsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Void}, Ptr{Uint}),
socket.data, $k, $p, $sz)
if rc != 0
Expand All @@ -226,6 +231,12 @@ get_rcvmore(socket::Socket) = bool(_zmq_getsockopt_rcvmore(socket))
ismore(socket::Socket) = get_rcvmore(socket)


# Raw FD access
@unix_only fd(socket::Socket) = RawFD(get_fd(socket))
@windows_only fd(socket::Socket) = WindowsRawSocket(get_fd(socket))
wait(socket::Socket; readable=false, writeable=false) = wait(fd(socket); readable=readable, writeable=writeable)


# Socket options of string type
let u8ap = zeros(Uint8, 255), sz = zeros(Uint, 1)
opslist = {
Expand All @@ -246,7 +257,7 @@ for (fset, fget, k) in opslist
if length(option_val) > 255
throw(StateError("option value too large"))
end
rc = ccall((:zmq_setsockopt, :libzmq), Cint,
rc = ccall((:zmq_setsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Uint8}, Uint),
socket.data, $k, option_val, length(option_val))
if rc != 0
Expand All @@ -258,7 +269,7 @@ for (fset, fget, k) in opslist
@eval global ($fget)
@eval function ($fget)(socket::Socket)
($sz)[1] = length($u8ap)
rc = ccall((:zmq_getsockopt, :libzmq), Cint,
rc = ccall((:zmq_getsockopt, zmq), Cint,
(Ptr{Void}, Cint, Ptr{Uint8}, Ptr{Uint}),
socket.data, $k, $u8ap, $sz)
if rc != 0
Expand All @@ -273,14 +284,14 @@ end # let


function bind(socket::Socket, endpoint::String)
rc = ccall((:zmq_bind, :libzmq), Cint, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
rc = ccall((:zmq_bind, zmq), Cint, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end

function connect(socket::Socket, endpoint::String)
rc=ccall((:zmq_connect, :libzmq), Cint, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
rc=ccall((:zmq_connect, zmq), Cint, (Ptr{Void}, Ptr{Uint8}), socket.data, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -300,7 +311,7 @@ type Message <: AbstractArray{Uint8,1}
# Create an empty message (for receive)
function Message()
zmsg = new()
rc = ccall((:zmq_msg_init, :libzmq), Cint, (Ptr{Message},), &zmsg)
rc = ccall((:zmq_msg_init, zmq), Cint, (Ptr{Message},), &zmsg)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -310,7 +321,7 @@ type Message <: AbstractArray{Uint8,1}
# Create a message with a given buffer size (for send)
function Message(len::Integer)
zmsg = new()
rc = ccall((:zmq_msg_init_size, :libzmq), Cint, (Ptr{Message}, Csize_t), &zmsg, len)
rc = ccall((:zmq_msg_init_size, zmq), Cint, (Ptr{Message}, Csize_t), &zmsg, len)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -322,7 +333,7 @@ type Message <: AbstractArray{Uint8,1}
function Message{T}(origin, m::Ptr{T}, len::Integer)
zmsg = new()
zmsg.bufferorigin = origin # should be origin of data pointed to by m
rc = ccall((:zmq_msg_init_data, :libzmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len*sizeof(T), C_NULL, C_NULL)
rc = ccall((:zmq_msg_init_data, zmq), Cint, (Ptr{Message}, Ptr{T}, Csize_t, Ptr{Void}, Ptr{Void}), &zmsg, m, len*sizeof(T), C_NULL, C_NULL)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -335,10 +346,10 @@ end

# AbstractArray behaviors:
similar(a::Message, T, dims::Dims) = Array(T, dims) # ?
length(zmsg::Message) = ccall((:zmq_msg_size, :libzmq), Csize_t, (Ptr{Message},) , &zmsg)
length(zmsg::Message) = ccall((:zmq_msg_size, zmq), Csize_t, (Ptr{Message},) , &zmsg)
size(zmsg::Message) = (length(zmsg),)
stride(zmsg::Message, i::Integer) = i <= 1 ? 1 : length(zmsg)
convert(::Type{Ptr{Uint8}}, zmsg::Message) = ccall((:zmq_msg_data, :libzmq), Ptr{Uint8}, (Ptr{Message},), &zmsg)
convert(::Type{Ptr{Uint8}}, zmsg::Message) = ccall((:zmq_msg_data, zmq), Ptr{Uint8}, (Ptr{Message},), &zmsg)
function getindex(a::Message, i::Integer)
if i < 1 || i > length(a)
throw(BoundsError())
Expand All @@ -365,21 +376,21 @@ end
# Close a message. You should not need to call this manually (let the
# finalizer do it).
function close(zmsg::Message)
rc = ccall((:zmq_msg_close, :libzmq), Cint, (Ptr{Message},), &zmsg)
rc = ccall((:zmq_msg_close, zmq), Cint, (Ptr{Message},), &zmsg)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end

@v3only begin
function get(zmsg::Message, property::Integer)
val = ccall((:zmq_msg_get, :libzmq), Cint, (Ptr{Void}, Cint), zmsg.data, property)
val = ccall((:zmq_msg_get, zmq), Cint, (Ptr{Void}, Cint), zmsg.data, property)
if val < 0
throw(StateError(jl_zmq_error_str()))
end
end
function set(zmsg::Message, property::Integer, value::Integer)
rc = ccall((:zmq_msg_set, :libzmq), Cint, (Ptr{Void}, Cint, Cint), zmsg.data, property, value)
rc = ccall((:zmq_msg_set, zmq), Cint, (Ptr{Void}, Cint, Cint), zmsg.data, property, value)
if rc < 0
throw(StateError(jl_zmq_error_str()))
end
Expand All @@ -396,7 +407,7 @@ end # end v3only

@v2only begin
function send(socket::Socket, zmsg::Message, flag=int32(0))
rc = ccall((:zmq_send, :libzmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
rc = ccall((:zmq_send, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, flag)
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -410,14 +421,20 @@ end # end v2only

@v3only begin
function send(socket::Socket, zmsg::Message, flag=int32(0))
rc = ccall((:zmq_msg_send, :libzmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
if (get_events(socket) & POLLOUT) == 0
wait(socket; writeable = true)
end
rc = ccall((:zmq_msg_send, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
&zmsg, socket.data, flag)
if rc == -1
throw(StateError(jl_zmq_error_str()))
end
end
function send{T}(socket::Socket, msg::Ptr{T}, len, flag=int32(0))
rc = ccall((:zmq_send, :libzmq), Cint,
if (get_events(socket) & POLLOUT) == 0
wait(socket; writeable = true)
end
rc = ccall((:zmq_send, zmq), Cint,
(Ptr{Void}, Ptr{T}, Csize_t, Cint),
socket.data, msg, len * sizeof(T), flag)
if rc == -1
Expand All @@ -431,7 +448,10 @@ end # end v3only
@v2only begin
function recv(socket::Socket, flag=int32(0))
zmsg = Message()
rc = ccall((:zmq_recv, :libzmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
if (get_events(socket) & POLLIN) == 0
wait(socket; readable = true)
end
rc = ccall((:zmq_recv, zmq), Cint, (Ptr{Void}, Ptr{Message}, Cint),
socket.data, &zmsg, flag)
if rc != 0
throw(StateError(jl_zmq_error_str()))
Expand All @@ -443,7 +463,10 @@ end # end v2only
@v3only begin
function recv(socket::Socket, flag=int32(0))
zmsg = Message()
rc = ccall((:zmq_msg_recv, :libzmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
if (get_events(socket) & POLLIN) == 0
wait(socket; readable = true)
end
rc = ccall((:zmq_msg_recv, zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
&zmsg, socket.data, flag)
if rc == -1
throw(StateError(jl_zmq_error_str()))
Expand Down
19 changes: 19 additions & 0 deletions test/ZMQ.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ ZMQ.send(s2, Message("test request"))
ZMQ.send(s1, Message("test response"))
@assert (bytestring(ZMQ.recv(s2)) == "test response")

# Test task-blocking behavior
c = Base.Condition()
msg_sent = false
@async begin
global msg_sent
sleep(0.5)
msg_sent = true
ZMQ.send(s2, Message("test request"))
@assert (bytestring(ZMQ.recv(s2)) == "test response")
notify(c)
end

# This will hang forver if ZMQ blocks the entire process since
# we'll never switch to the other task
@assert (bytestring(ZMQ.recv(s1)) == "test request")
@assert msg_sent == true
ZMQ.send(s1, Message("test response"))
wait(c)

ZMQ.send(s2, Message("another test request"))
msg = ZMQ.recv(s1)
o=convert(IOStream, msg)
Expand Down

0 comments on commit ef127ca

Please sign in to comment.