Skip to content

Commit

Permalink
Merge branch 'master' into vs/ci
Browse files Browse the repository at this point in the history
  • Loading branch information
ViralBShah authored Oct 20, 2022
2 parents ff7c43a + 9f30c26 commit 96f7a56
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 23 deletions.
1 change: 0 additions & 1 deletion docs/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
ZMQ = "c2297ded-f4af-51ae-bb23-16f91089e4e1"

[compat]
Documenter = "0.26"
2 changes: 1 addition & 1 deletion docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ makedocs(
],
"Reference" => "reference.md",
],
#= Documenter.HTML(), =#
format = Documenter.HTML(prettyurls = get(ENV, "CI", nothing) == "true")
)

deploydocs(
Expand Down
49 changes: 48 additions & 1 deletion docs/src/reference.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,51 @@
# Reference

```@index
## Sockets

The ZMQ Socket type:

```@docs
Socket
isopen
close
```

[`Socket`](@ref) implements the
[`Sockets`](https://docs.julialang.org/en/v1/stdlib/Sockets/) interface:
```@docs
bind
connect
recv
send
```

ZMQ socket types (note: some of these are aliases; e.g. `XREQ = DEALER`):
```@docs
PAIR
PUB
SUB
REQ
REP
DEALER
ROUTER
PULL
PUSH
XPUB
XSUB
XREQ
XREP
UPSTREAM
DOWNSTREAM
```

## Messages

```@docs
Message
```

## Context

```@docs
ZMQ.context
```
20 changes: 14 additions & 6 deletions src/comm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ function Sockets.send(socket::Socket, data; more::Bool=false)
end
end

# zero-copy version using user-allocated Message
"""
send(socket::Socket, zmsg::Message; more::Bool=false)
Zero-copy version of [`Sockets.send(socket, data)`](@ref) using a user-allocated
[`Message`](@ref).
"""
Sockets.send(socket::Socket, zmsg::Message; more::Bool=false) = _send(socket, zmsg, more)

import Sockets: send
Expand Down Expand Up @@ -86,19 +91,22 @@ function _recv!(socket::Socket, zmsg)
end

"""
recv(socket::Socket) :: Message
recv(socket::Socket)
Return a `Message` object representing a message received from a ZMQ `Socket`
(without making a copy of the message data).
"""
Sockets.recv(socket::Socket) = _recv!(socket, Message())

"""
recv(socket::Socket, ::Type{T})
recv(socket::Socket, ::Type{T})
Receive a message of type `T` (typically a `String`, `Vector{UInt8}`, or
[`isbits`](https://docs.julialang.org/en/v1/base/base/#Base.isbits) type)
from a ZMQ [`Socket`](@ref). (Makes a copy of the message data; you can alternatively
use [`recv(socket)`](@ref) to work with zero-copy bytearray-like representation for
large messages.)
Receive a message of type `T` (typically a `String`, `Vector{UInt8}`, or [`isbits`](@ref) type)
from a ZMQ `Socket`. (Makes a copy of the message data; you can alternatively use
`recv(socket)` to work with zero-copy bytearray-like representation for large messages.)
"""
function Sockets.recv(socket::Socket, ::Type{T}) where {T}
zmsg = msg_init()
Expand Down
18 changes: 16 additions & 2 deletions src/constants.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,35 @@ const IO_THREADS = 1
const MAX_SOCKETS = 2
const IPV6 = 42

#Socket Types
"[PAIR](https://zeromq.org/socket-api/#pair-socket) socket."
const PAIR = 0
"[PUB](https://zeromq.org/socket-api/#pub-socket) socket."
const PUB = 1
"[SUB](https://zeromq.org/socket-api/#sub-socket) socket."
const SUB = 2
"[REQ](https://zeromq.org/socket-api/#req-socket) socket."
const REQ = 3
"[REP](https://zeromq.org/socket-api/#rep-socket) socket."
const REP = 4
"[DEALER](https://zeromq.org/socket-api/#dealer-socket) socket."
const DEALER = 5
"[ROUTER](https://zeromq.org/socket-api/#router-socket) socket."
const ROUTER = 6
"[PULL](https://zeromq.org/socket-api/#pull-socket) socket."
const PULL = 7
"[PUSH](https://zeromq.org/socket-api/#push-socket) socket."
const PUSH = 8
"[XPUB](https://zeromq.org/socket-api/#xpub-socket) socket."
const XPUB = 9
"[XSUB](https://zeromq.org/socket-api/#xsub-socket) socket."
const XSUB = 10
"[XREQ](https://zeromq.org/socket-api/#dealer-socket) socket."
const XREQ = DEALER
"[XREP](https://zeromq.org/socket-api/#router-socket) socket."
const XREP = ROUTER
"[UPSTREAM](https://zeromq.org/socket-api/#pull-socket) socket."
const UPSTREAM = PULL
"[DOWNSTREAM](https://zeromq.org/socket-api/#push-socket) socket."
const DOWNSTREAM = PUSH

#Message options
Expand All @@ -40,4 +54,4 @@ const QUEUE = 3

#Send/Recv Options
const ZMQ_DONTWAIT = 1
const ZMQ_SNDMORE = 2
const ZMQ_SNDMORE = 2
113 changes: 102 additions & 11 deletions src/message.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
## High-level Message object for sending/receiving ZMQ messages in shared buffers.

include("_message.jl")

# in order to support zero-copy messages that share data with Julia
Expand All @@ -24,12 +22,69 @@ function gc_free_fn(data::Ptr{Cvoid}, hint::Ptr{Cvoid})
ccall(:uv_async_send,Cint,(Ptr{Cvoid},),hint)
end

"""
High-level Message object for sending/receiving ZMQ messages in shared buffers.
Message()
Create an empty message (for receive).
---
Message(len::Integer)
Create a message with a given buffer size (for send).
---
Message(origin::Any, m::Ptr{T}, len::Integer) where {T}
Low-level function to create a message (for send) with an existing
data buffer, without making a copy. The origin parameter should
be the Julia object that is the origin of the data, so that
we can hold a reference to it until ZMQ is done with the buffer.
---
Message(m::String)
Create a message with a string as a buffer (for send). Note: the Message now
"owns" the string, it must not be resized, or even written to after the message
is sent.
---
Message(p::SubString{String})
Create a message with a sub-string as a buffer (for send). Note: the same
ownership semantics as for [`Message(m::String)`](@ref) apply.
---
Message(a::Array)
Create a message with an array as a buffer (for send). Note: the same
ownership semantics as for [`Message(m::String)`](@ref) apply.
---
Message(io::IOBuffer)
Create a message with an
[`IOBuffer`](https://docs.julialang.org/en/v1/base/io-network/#Base.IOBuffer) as
a buffer (for send). Note: the same ownership semantics as for
[`Message(m::String)`](@ref) apply.
"""
mutable struct Message <: AbstractArray{UInt8,1}
# Matching the declaration in the header: char _[64];
w_padding::_Message
handle::Ptr{Cvoid} # index into gc_protect, if any

# Create an empty message (for receive)
"""
Message()
Create an empty message (for receive).
"""
function Message()
zmsg = new()
setfield!(zmsg, :handle, C_NULL)
Expand All @@ -40,7 +95,12 @@ mutable struct Message <: AbstractArray{UInt8,1}
finalizer(close, zmsg)
return zmsg
end
# Create a message with a given buffer size (for send)

"""
Message(len::Integer)
Create a message with a given buffer size (for send).
"""
function Message(len::Integer)
zmsg = new()
setfield!(zmsg, :handle, C_NULL)
Expand All @@ -52,10 +112,14 @@ mutable struct Message <: AbstractArray{UInt8,1}
return zmsg
end

# low-level function to create a message (for send) with an existing
# data buffer, without making a copy. The origin parameter should
# be the Julia object that is the origin of the data, so that
# we can hold a reference to it until zeromq is done with the buffer.
"""
Message(origin::Any, m::Ptr{T}, len::Integer) where {T}
Low-level function to create a message (for send) with an existing
data buffer, without making a copy. The origin parameter should
be the Julia object that is the origin of the data, so that
we can hold a reference to it until ZMQ is done with the buffer.
"""
function Message(origin::Any, m::Ptr{T}, len::Integer) where {T}
zmsg = new()
setfield!(zmsg, :handle, gc_protect_handle(origin))
Expand All @@ -70,13 +134,40 @@ mutable struct Message <: AbstractArray{UInt8,1}
return zmsg
end

# Create a message with a given AbstractString or Array as a buffer (for send)
# (note: now "owns" the buffer ... the Array must not be resized,
# or even written to after the message is sent!)
"""
Message(m::String)
Create a message with a string as a buffer (for send). Note: the Message now
"owns" the string, it must not be resized, or even written to after the message
is sent.
"""
Message(m::String) = Message(m, pointer(m), sizeof(m))

"""
Message(p::SubString{String})
Create a message with a sub-string as a buffer (for send). Note: the same
ownership semantics as for [`Message(m::String)`](@ref) apply.
"""
Message(p::SubString{String}) =
Message(p, pointer(p.string)+p.offset, sizeof(p))

"""
Message(a::Array)
Create a message with an array as a buffer (for send). Note: the same
ownership semantics as for [`Message(m::String)`](@ref) apply.
"""
Message(a::Array) = Message(a, pointer(a), sizeof(a))

"""
Message(io::IOBuffer)
Create a message with an
[`IOBuffer`](https://docs.julialang.org/en/v1/base/io-network/#Base.IOBuffer) as
a buffer (for send). Note: the same ownership semantics as for
[`Message(m::String)`](@ref) apply.
"""
function Message(io::IOBuffer)
if !io.readable || !io.seekable
error("byte read failed")
Expand Down
Loading

0 comments on commit 96f7a56

Please sign in to comment.