Skip to content

Commit

Permalink
Merge pull request #36 from digitalextremist/master
Browse files Browse the repository at this point in the history
Interim PR for @anmod4n to review
  • Loading branch information
digitalextremist committed May 24, 2014
2 parents 110861d + faa75cd commit a5d0efd
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 0 deletions.
77 changes: 77 additions & 0 deletions examples/curve_pubsub.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
$:.unshift(File.expand_path("../lib"))

require 'celluloid/zmq'
Celluloid::ZMQ.init

puts "Curving... PID# #{Process.pid}"

class EncryptedPublishSubscribe
include Celluloid::ZMQ

def run
link = "tcp://127.0.0.1:5555"

s1 = PubSocket.new_curve

s2 = SubSocket.new_client s1
s3 = SubSocket.new_client s1
s4 = SubSocket.new_client s1
s5 = SubSocket.new_client s1


s1.linger = 100
s2.subscribe('') # receive all
s3.subscribe('animals') # receive any starting with this string
s4.subscribe('animals.dog')
s5.subscribe('animals.cat')

s1.bind(link)
s2.connect(link)
s3.connect(link)
s4.connect(link)
s5.connect(link)

sleep 1

topic = "animals.dog"
payload = "Animal crackers!"

s1.identity = "publisher-A"
puts "sending"
# use the new multi-part messaging support to
# automatically separate the topic from the body
s1.write(topic, payload, s1.identity)

topic = ''
s2.read(topic)

body = ''
s2.read(body) if s2.more_parts?

identity = ''
s2.read(identity) if s2.more_parts?
puts "s2 received topic [#{topic}], body [#{body}], identity [#{identity}]"


topic = ''
s3.read(topic)

body = ''
s3.read(body) if s3.more_parts?
puts "s3 received topic [#{topic}], body [#{body}]"

topic = ''
s4.read(topic)

body = ''
s4.read(body) if s4.more_parts?
puts "s4 received topic [#{topic}], body [#{body}]"

s5_string = ''
s5.read(s5_string)

# we will never get here
end
end

EncryptedPublishSubscribe.new.run
5 changes: 5 additions & 0 deletions examples/publish_subscribe.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
$:.unshift(File.expand_path("../lib"))

puts "PublishSubscribe... PID# #{Process.pid}"

require 'celluloid/zmq'
Celluloid::ZMQ.init

class PublishSubscribe
include Celluloid::ZMQ
Expand Down
3 changes: 3 additions & 0 deletions lib/celluloid/zmq.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
require 'ffi-rzmq'

require 'celluloid'

require 'celluloid/zmq/mailbox'
require 'celluloid/zmq/reactor'
require 'celluloid/zmq/sockets'
require 'celluloid/zmq/version'
require 'celluloid/zmq/waker'

require 'celluloid/zmq/curve'

module Celluloid
# Actors which run alongside 0MQ sockets
module ZMQ
Expand Down
86 changes: 86 additions & 0 deletions lib/celluloid/zmq/curve.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
module Celluloid
module ZMQ

class Socket

class << self

def new_curve(options={})
socket = new
socket.extend Curve
socket.init options
socket
end

def new_client(socket)
raise UninitializedError, "No server socket to create clients for." unless socket.server?
new_curve socket.new_client
end

def new_server(options={})
new_curve options.merge(:type=>server)
end
end

end

module Curve

attr_accessor :curve

def init(options)
raise UninitializedError unless options.is_a? Hash
@curve = options.inject({}) { |s,(k,v)| s[k.to_sym] = v; s }
unless @curve[:type]
if @curve[:server_public_key] and !@curve[:server_private_key]
@curve[:type] = :client
else
@curve[:type] = :server
end
end

Celluloid::Logger.info "CurveZMQ wrapper: #{@curve[:type]}"

case @curve[:type]
when :server
unless @curve[:server_private_key]
@curve[:server_public_key], @curve[:server_private_key] = generate_keypair
Celluloid::Logger.info "generated keypair for CurveZMQ server wrapper"
Celluloid::Logger.info "public key: #{@curve[:server_public_key]}"
Celluloid::Logger.info "private key: #{@curve[:server_private_key]}"
end
set(::ZMQ::CURVE_SERVER, 1)
set(::ZMQ::CURVE_SECRETKEY, @curve[:server_private_key])
when :client
raise UninitializedError, "No server public key provided to client." unless @curve[:server_public_key]
unless @curve[:client_public_key] and @curve[:client_private_key]
@curve[:client_public_key], @curve[:client_private_key] = generate_keypair
Celluloid::Logger.info "generated keypair for CurveZMQ client wrapper"
Celluloid::Logger.info "public key: #{@curve[:client_public_key]}"
Celluloid::Logger.info "private key: #{@curve[:client_private_key]}"
end
set(::ZMQ::CURVE_SERVERKEY, @curve[:server_public_key])
set(::ZMQ::CURVE_PUBLICKEY, @curve[:client_public_key])
set(::ZMQ::CURVE_SECRETKEY, @curve[:client_private_key])
else
raise UninitializedError, "No curve socket type specified."
end

end

def server?
!@curve[:server_private_key].nil?
end

def new_client
raise UninitializedError, "No server public key." unless @curve[:server_public_key]
{ :server_public_key => @curve[:server_public_key] }
end

def generate_keypair
::ZMQ::Util.curve_keypair
end

end
end
end
1 change: 1 addition & 0 deletions lib/celluloid/zmq/sockets.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Celluloid
module ZMQ
class Socket

# Create a new socket
def initialize(type)
@socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upcase)
Expand Down

0 comments on commit a5d0efd

Please sign in to comment.