diff --git a/docs/Project.toml b/docs/Project.toml index 6a99be3..382b46e 100644 --- a/docs/Project.toml +++ b/docs/Project.toml @@ -2,5 +2,4 @@ Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" ZMQ = "c2297ded-f4af-51ae-bb23-16f91089e4e1" -[compat] Documenter = "0.26" diff --git a/docs/make.jl b/docs/make.jl index c58fa21..93ea695 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -12,7 +12,7 @@ makedocs( ], "Reference" => "reference.md", ], - #= Documenter.HTML(), =# + format = Documenter.HTML(prettyurls = get(ENV, "CI", nothing) == "true") ) deploydocs( diff --git a/docs/src/reference.md b/docs/src/reference.md index 2dec9b1..628f327 100644 --- a/docs/src/reference.md +++ b/docs/src/reference.md @@ -1,4 +1,51 @@ # Reference -```@index +## Sockets + +The ZMQ Socket type: + +```@docs +Socket +isopen +close +``` + +[`Socket`](@ref) implements the +[`Sockets`](https://docs.julialang.org/en/v1/stdlib/Sockets/) interface: +```@docs +bind +connect +recv +send +``` + +ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`): +```@docs +PAIR +PUB +SUB +REQ +REP +DEALER +ROUTER +PULL +PUSH +XPUB +XSUB +XREQ +XREP +UPSTREAM +DOWNSTREAM +``` + +## Messages + +```@docs +Message +``` + +## Context + +```@docs +ZMQ.context ``` diff --git a/src/comm.jl b/src/comm.jl index 8c8fd25..8d00128 100644 --- a/src/comm.jl +++ b/src/comm.jl @@ -48,7 +48,12 @@ function Sockets.send(socket::Socket, data; more::Bool=false) end end -# zero-copy version using user-allocated Message +""" + send(socket::Socket, zmsg::Message; more::Bool=false) + +Zero-copy version of [`Sockets.send(socket, data)`](@ref) using a user-allocated +[`Message`](@ref). +""" Sockets.send(socket::Socket, zmsg::Message; more::Bool=false) = _send(socket, zmsg, more) import Sockets: send @@ -86,7 +91,7 @@ function _recv!(socket::Socket, zmsg) end """ - recv(socket::Socket) :: Message + recv(socket::Socket) Return a `Message` object representing a message received from a ZMQ `Socket` (without making a copy of the message data). @@ -94,11 +99,14 @@ Return a `Message` object representing a message received from a ZMQ `Socket` Sockets.recv(socket::Socket) = _recv!(socket, Message()) """ - recv(socket::Socket, ::Type{T}) + recv(socket::Socket, ::Type{T}) + +Receive a message of type `T` (typically a `String`, `Vector{UInt8}`, or +[`isbits`](https://docs.julialang.org/en/v1/base/base/#Base.isbits) type) +from a ZMQ [`Socket`](@ref). (Makes a copy of the message data; you can alternatively +use [`recv(socket)`](@ref) to work with zero-copy bytearray-like representation for +large messages.) -Receive a message of type `T` (typically a `String`, `Vector{UInt8}`, or [`isbits`](@ref) type) -from a ZMQ `Socket`. (Makes a copy of the message data; you can alternatively use -`recv(socket)` to work with zero-copy bytearray-like representation for large messages.) """ function Sockets.recv(socket::Socket, ::Type{T}) where {T} zmsg = msg_init() diff --git a/src/constants.jl b/src/constants.jl index 1f3b7c4..7e0f96e 100644 --- a/src/constants.jl +++ b/src/constants.jl @@ -6,21 +6,35 @@ const IO_THREADS = 1 const MAX_SOCKETS = 2 const IPV6 = 42 -#Socket Types +"[PAIR](https://zeromq.org/socket-api/#pair-socket) socket." const PAIR = 0 +"[PUB](https://zeromq.org/socket-api/#pub-socket) socket." const PUB = 1 +"[SUB](https://zeromq.org/socket-api/#sub-socket) socket." const SUB = 2 +"[REQ](https://zeromq.org/socket-api/#req-socket) socket." const REQ = 3 +"[REP](https://zeromq.org/socket-api/#rep-socket) socket." const REP = 4 +"[DEALER](https://zeromq.org/socket-api/#dealer-socket) socket." const DEALER = 5 +"[ROUTER](https://zeromq.org/socket-api/#router-socket) socket." const ROUTER = 6 +"[PULL](https://zeromq.org/socket-api/#pull-socket) socket." const PULL = 7 +"[PUSH](https://zeromq.org/socket-api/#push-socket) socket." const PUSH = 8 +"[XPUB](https://zeromq.org/socket-api/#xpub-socket) socket." const XPUB = 9 +"[XSUB](https://zeromq.org/socket-api/#xsub-socket) socket." const XSUB = 10 +"[XREQ](https://zeromq.org/socket-api/#dealer-socket) socket." const XREQ = DEALER +"[XREP](https://zeromq.org/socket-api/#router-socket) socket." const XREP = ROUTER +"[UPSTREAM](https://zeromq.org/socket-api/#pull-socket) socket." const UPSTREAM = PULL +"[DOWNSTREAM](https://zeromq.org/socket-api/#push-socket) socket." const DOWNSTREAM = PUSH #Message options @@ -40,4 +54,4 @@ const QUEUE = 3 #Send/Recv Options const ZMQ_DONTWAIT = 1 -const ZMQ_SNDMORE = 2 \ No newline at end of file +const ZMQ_SNDMORE = 2 diff --git a/src/message.jl b/src/message.jl index 71e231a..646dda3 100644 --- a/src/message.jl +++ b/src/message.jl @@ -1,5 +1,3 @@ -## High-level Message object for sending/receiving ZMQ messages in shared buffers. - include("_message.jl") # in order to support zero-copy messages that share data with Julia @@ -24,12 +22,69 @@ function gc_free_fn(data::Ptr{Cvoid}, hint::Ptr{Cvoid}) ccall(:uv_async_send,Cint,(Ptr{Cvoid},),hint) end +""" +High-level Message object for sending/receiving ZMQ messages in shared buffers. + + Message() + +Create an empty message (for receive). + +--- + + Message(len::Integer) + +Create a message with a given buffer size (for send). + +--- + + Message(origin::Any, m::Ptr{T}, len::Integer) where {T} + +Low-level function to create a message (for send) with an existing +data buffer, without making a copy. The origin parameter should +be the Julia object that is the origin of the data, so that +we can hold a reference to it until ZMQ is done with the buffer. + +--- + + Message(m::String) + +Create a message with a string as a buffer (for send). Note: the Message now +"owns" the string, it must not be resized, or even written to after the message +is sent. + +--- + + Message(p::SubString{String}) + +Create a message with a sub-string as a buffer (for send). Note: the same +ownership semantics as for [`Message(m::String)`](@ref) apply. + +--- + + Message(a::Array) + +Create a message with an array as a buffer (for send). Note: the same +ownership semantics as for [`Message(m::String)`](@ref) apply. + +--- + + Message(io::IOBuffer) + +Create a message with an +[`IOBuffer`](https://docs.julialang.org/en/v1/base/io-network/#Base.IOBuffer) as +a buffer (for send). Note: the same ownership semantics as for +[`Message(m::String)`](@ref) apply. +""" mutable struct Message <: AbstractArray{UInt8,1} # Matching the declaration in the header: char _[64]; w_padding::_Message handle::Ptr{Cvoid} # index into gc_protect, if any - # Create an empty message (for receive) + """ + Message() + + Create an empty message (for receive). + """ function Message() zmsg = new() setfield!(zmsg, :handle, C_NULL) @@ -40,7 +95,12 @@ mutable struct Message <: AbstractArray{UInt8,1} finalizer(close, zmsg) return zmsg end - # Create a message with a given buffer size (for send) + + """ + Message(len::Integer) + + Create a message with a given buffer size (for send). + """ function Message(len::Integer) zmsg = new() setfield!(zmsg, :handle, C_NULL) @@ -52,10 +112,14 @@ mutable struct Message <: AbstractArray{UInt8,1} return zmsg end - # low-level function to create a message (for send) with an existing - # data buffer, without making a copy. The origin parameter should - # be the Julia object that is the origin of the data, so that - # we can hold a reference to it until zeromq is done with the buffer. + """ + Message(origin::Any, m::Ptr{T}, len::Integer) where {T} + + Low-level function to create a message (for send) with an existing + data buffer, without making a copy. The origin parameter should + be the Julia object that is the origin of the data, so that + we can hold a reference to it until ZMQ is done with the buffer. + """ function Message(origin::Any, m::Ptr{T}, len::Integer) where {T} zmsg = new() setfield!(zmsg, :handle, gc_protect_handle(origin)) @@ -70,13 +134,40 @@ mutable struct Message <: AbstractArray{UInt8,1} return zmsg end - # Create a message with a given AbstractString or Array as a buffer (for send) - # (note: now "owns" the buffer ... the Array must not be resized, - # or even written to after the message is sent!) + """ + Message(m::String) + + Create a message with a string as a buffer (for send). Note: the Message now + "owns" the string, it must not be resized, or even written to after the message + is sent. + """ Message(m::String) = Message(m, pointer(m), sizeof(m)) + + """ + Message(p::SubString{String}) + + Create a message with a sub-string as a buffer (for send). Note: the same + ownership semantics as for [`Message(m::String)`](@ref) apply. + """ Message(p::SubString{String}) = Message(p, pointer(p.string)+p.offset, sizeof(p)) + + """ + Message(a::Array) + + Create a message with an array as a buffer (for send). Note: the same + ownership semantics as for [`Message(m::String)`](@ref) apply. + """ Message(a::Array) = Message(a, pointer(a), sizeof(a)) + + """ + Message(io::IOBuffer) + + Create a message with an + [`IOBuffer`](https://docs.julialang.org/en/v1/base/io-network/#Base.IOBuffer) as + a buffer (for send). Note: the same ownership semantics as for + [`Message(m::String)`](@ref) apply. + """ function Message(io::IOBuffer) if !io.readable || !io.seekable error("byte read failed") diff --git a/src/socket.jl b/src/socket.jl index 2757d75..e1fe96c 100644 --- a/src/socket.jl +++ b/src/socket.jl @@ -1,8 +1,31 @@ -## Sockets ## +""" +A ZMQ socket. + + Socket(typ::Integer) + +Create a socket of a certain type. + +--- + + Socket(ctx::Context, typ::Integer) + +Create a socket in a given context. + +--- + + Socket(f::Function, args...) + +Do-block constructor. +""" mutable struct Socket data::Ptr{Cvoid} pollfd::_FDWatcher + """ + Socket(ctx::Context, typ::Integer) + + Create a socket in a given context. + """ function Socket(ctx::Context, typ::Integer) p = ccall((:zmq_socket, libzmq), Ptr{Cvoid}, (Ptr{Cvoid}, Cint), ctx, typ) if p == C_NULL @@ -14,9 +37,20 @@ mutable struct Socket push!(getfield(ctx, :sockets), WeakRef(socket)) return socket end + + """ + Socket(typ::Integer) + + Create a socket of a certain type. + """ Socket(typ::Integer) = Socket(context(), typ) end +""" + Socket(f::Function, args...) + +Do-block constructor. +""" function Socket(f::Function, args...) socket = Socket(args...) try @@ -28,7 +62,14 @@ end Base.unsafe_convert(::Type{Ptr{Cvoid}}, s::Socket) = getfield(s, :data) +""" + Base.isopen(socket::Socket) +""" Base.isopen(socket::Socket) = getfield(socket, :data) != C_NULL + +""" + Base.close(socket::Socket) +""" function Base.close(socket::Socket) if isopen(socket) close(getfield(socket, :pollfd), #=readable=#true, #=writable=#false) @@ -52,6 +93,13 @@ end Base.wait(socket::Socket) = wait(getfield(socket, :pollfd), readable=true, writable=false) Base.notify(socket::Socket) = @preserve socket uv_pollcb(getfield(socket, :pollfd).handle, Int32(0), Int32(UV_READABLE)) +""" + Sockets.bind(socket::Socket, endpoint::AbstractString) + +Bind the socket to an endpoint. Note that the endpoint must be formatted as +described +[here](http://api.zeromq.org/4-3:zmq-bind). e.g. `tcp://127.0.0.1:42000`. +""" function Sockets.bind(socket::Socket, endpoint::AbstractString) rc = ccall((:zmq_bind, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint) if rc != 0 @@ -59,6 +107,11 @@ function Sockets.bind(socket::Socket, endpoint::AbstractString) end end +""" + Sockets.connect(socket::Socket, endpoint::AbstractString) + +Connect the socket to an endpoint. +""" function Sockets.connect(socket::Socket, endpoint::AbstractString) rc=ccall((:zmq_connect, libzmq), Cint, (Ptr{Cvoid}, Ptr{UInt8}), socket, endpoint) if rc != 0