-
Notifications
You must be signed in to change notification settings - Fork 58
/
socket.jl
120 lines (97 loc) · 2.91 KB
/
socket.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
A ZMQ socket.
Socket(typ::Integer)
Create a socket of a certain type.
---
Socket(ctx::Context, typ::Integer)
Create a socket in a given context.
---
Socket(f::Function, args...)
Do-block constructor.
"""
mutable struct Socket
data::Ptr{Cvoid}
pollfd::_FDWatcher
"""
Socket(ctx::Context, typ::Integer)
Create a socket in a given context.
"""
function Socket(ctx::Context, typ::Integer)
p = ccall((:zmq_socket, libzmq), Ptr{Cvoid}, (Ptr{Cvoid}, Cint), ctx, typ)
if p == C_NULL
throw(StateError(jl_zmq_error_str()))
end
socket = new(p)
setfield!(socket, :pollfd, _FDWatcher(fd(socket), #=readable=#true, #=writable=#false))
finalizer(close, socket)
push!(getfield(ctx, :sockets), WeakRef(socket))
return socket
end
"""
Socket(typ::Integer)
Create a socket of a certain type.
"""
Socket(typ::Integer) = Socket(context(), typ)
end
"""
Socket(f::Function, args...)
Do-block constructor.
"""
function Socket(f::Function, args...)
socket = Socket(args...)
try
f(socket)
finally
close(socket)
end
end
Base.unsafe_convert(::Type{Ptr{Cvoid}}, s::Socket) = getfield(s, :data)
"""
Base.isopen(socket::Socket)
"""
Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL
"""
Base.close(socket::Socket)
"""
function Base.close(socket::Socket)
if isopen(socket)
close(getfield(socket, :pollfd), #=readable=#true, #=writable=#false)
rc = ccall((:zmq_close, libzmq), Cint, (Ptr{Cvoid},), socket)
setfield!(socket, :data, C_NULL)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end
end
# Raw FD access
if Sys.isunix()
Base.fd(socket::Socket) = RawFD(socket.fd)
end
if Sys.iswindows()
using Base.Libc: WindowsRawSocket
Base.fd(socket::Socket) = WindowsRawSocket(convert(Ptr{Cvoid}, socket.fd))
end
Base.wait(socket::Socket) = wait(getfield(socket, :pollfd), readable=true, writable=false)
Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).handle, Int32(0), Int32(UV_READABLE))
"""
Sockets.bind(socket::Socket, endpoint::AbstractString)
Bind the socket to an endpoint. Note that the endpoint must be formatted as
described
[here](http://api.zeromq.org/4-3:zmq-bind). e.g. `tcp://127.0.0.1:42000`.
"""
function Sockets.bind(socket::Socket, endpoint::AbstractString)
rc = ccall((:zmq_bind, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end
"""
Sockets.connect(socket::Socket, endpoint::AbstractString)
Connect the socket to an endpoint.
"""
function Sockets.connect(socket::Socket, endpoint::AbstractString)
rc=ccall((:zmq_connect, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint)
if rc != 0
throw(StateError(jl_zmq_error_str()))
end
end