-
Notifications
You must be signed in to change notification settings - Fork 58
/
comm.jl
119 lines (100 loc) · 3.89 KB
/
comm.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
## Send/receive messages.
############################################################################
msg_send(socket::Socket, zmsg::_MessageOrRef, flags::Integer) =
ccall((:zmq_msg_send, libzmq), Cint, (Ref{_Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)
msg_send(socket::Socket, zmsg::Message, flags::Integer) =
ccall((:zmq_msg_send, libzmq), Cint, (Ref{Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)
function _send(socket::Socket, zmsg, more::Bool=false)
while true
if -1 == msg_send(socket, zmsg, (ZMQ_SNDMORE*more) | ZMQ_DONTWAIT)
zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
while (socket.events & POLLOUT) == 0
wait(socket)
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
if notify_is_expensive
socket.events != 0 && notify(socket)
end
break
end
end
end
# By default, we send using _Message objects, which are optimized for
# small messages and copy the data. If the caller wants zero-copy communications,
# then should explicitly create a Message() object, a more heavyweight object
# that allows zero-copy access.
"""
send(socket::Socket, data; more=false)
Send `data` over `socket`. A `more=true` keyword argument can be passed
to indicate that `data` is a portion of a larger multipart message.
`data` can be any `isbits` type, a `Vector` of `isbits` elements, a
`String`, or a [`Message`](@ref) object to perform zero-copy sends
of large arrays.
"""
function Sockets.send(socket::Socket, data; more::Bool=false)
zmsg = _MessageRef(data)
try
_send(socket, zmsg, more)
finally
close(zmsg)
end
end
"""
send(socket::Socket, zmsg::Message; more::Bool=false)
Zero-copy version of [`Sockets.send(socket, data)`](@ref) using a user-allocated
[`Message`](@ref).
"""
Sockets.send(socket::Socket, zmsg::Message; more::Bool=false) = _send(socket, zmsg, more)
import Sockets: send
@deprecate send(socket::Socket, data, more::Bool) send(socket, data; more=more)
function Sockets.send(f::Function, socket::Socket; more::Bool=false)
io = IOBuffer()
f(io)
send(socket, take!(io); more=more)
end
############################################################################
msg_recv(socket::Socket, zmsg::_MessageOrRef, flags::Integer) =
ccall((:zmq_msg_recv, libzmq), Cint, (Ref{_Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)
msg_recv(socket::Socket, zmsg::Message, flags::Integer) =
ccall((:zmq_msg_recv, libzmq), Cint, (Ref{Message}, Ptr{Cvoid}, Cint), zmsg, socket, flags)
function _recv!(socket::Socket, zmsg)
while true
if -1 == msg_recv(socket, zmsg, ZMQ_DONTWAIT)
zmq_errno() == EAGAIN || throw(StateError(jl_zmq_error_str()))
while socket.events & POLLIN== 0
wait(socket)
end
else
notify_is_expensive = !isempty(getfield(socket,:pollfd).notify.waitq)
if notify_is_expensive
socket.events != 0 && notify(socket)
end
break
end
end
return zmsg
end
"""
recv(socket::Socket)
Return a `Message` object representing a message received from a ZMQ `Socket`
(without making a copy of the message data).
"""
Sockets.recv(socket::Socket) = _recv!(socket, Message())
"""
recv(socket::Socket, ::Type{T})
Receive a message of type `T` (typically a `String`, `Vector{UInt8}`, or
[`isbits`](https://docs.julialang.org/en/v1/base/base/#Base.isbits) type)
from a ZMQ [`Socket`](@ref). (Makes a copy of the message data; you can alternatively
use [`recv(socket)`](@ref) to work with zero-copy bytearray-like representation for
large messages.)
"""
function Sockets.recv(socket::Socket, ::Type{T}) where {T}
zmsg = msg_init()
try
_recv!(socket, zmsg)
return unsafe_copy(T, zmsg)
finally
close(zmsg)
end
end