diff --git a/examples/curve_pubsub.rb b/examples/curve_pubsub.rb new file mode 100644 index 0000000..40e552e --- /dev/null +++ b/examples/curve_pubsub.rb @@ -0,0 +1,77 @@ +$:.unshift(File.expand_path("../lib")) + +require 'celluloid/zmq' +Celluloid::ZMQ.init + +puts "Curving... PID# #{Process.pid}" + +class EncryptedPublishSubscribe + include Celluloid::ZMQ + + def run + link = "tcp://127.0.0.1:5555" + + s1 = PubSocket.new_curve + + s2 = SubSocket.new_client s1 + s3 = SubSocket.new_client s1 + s4 = SubSocket.new_client s1 + s5 = SubSocket.new_client s1 + + + s1.linger = 100 + s2.subscribe('') # receive all + s3.subscribe('animals') # receive any starting with this string + s4.subscribe('animals.dog') + s5.subscribe('animals.cat') + + s1.bind(link) + s2.connect(link) + s3.connect(link) + s4.connect(link) + s5.connect(link) + + sleep 1 + + topic = "animals.dog" + payload = "Animal crackers!" + + s1.identity = "publisher-A" + puts "sending" + # use the new multi-part messaging support to + # automatically separate the topic from the body + s1.write(topic, payload, s1.identity) + + topic = '' + s2.read(topic) + + body = '' + s2.read(body) if s2.more_parts? + + identity = '' + s2.read(identity) if s2.more_parts? + puts "s2 received topic [#{topic}], body [#{body}], identity [#{identity}]" + + + topic = '' + s3.read(topic) + + body = '' + s3.read(body) if s3.more_parts? + puts "s3 received topic [#{topic}], body [#{body}]" + + topic = '' + s4.read(topic) + + body = '' + s4.read(body) if s4.more_parts? + puts "s4 received topic [#{topic}], body [#{body}]" + + s5_string = '' + s5.read(s5_string) + + # we will never get here + end +end + +EncryptedPublishSubscribe.new.run \ No newline at end of file diff --git a/examples/publish_subscribe.rb b/examples/publish_subscribe.rb index e91aa37..22345ac 100644 --- a/examples/publish_subscribe.rb +++ b/examples/publish_subscribe.rb @@ -1,4 +1,9 @@ +$:.unshift(File.expand_path("../lib")) + +puts "PublishSubscribe... PID# #{Process.pid}" + require 'celluloid/zmq' +Celluloid::ZMQ.init class PublishSubscribe include Celluloid::ZMQ diff --git a/lib/celluloid/zmq.rb b/lib/celluloid/zmq.rb index 6884340..cb61e96 100644 --- a/lib/celluloid/zmq.rb +++ b/lib/celluloid/zmq.rb @@ -1,12 +1,15 @@ require 'ffi-rzmq' require 'celluloid' + require 'celluloid/zmq/mailbox' require 'celluloid/zmq/reactor' require 'celluloid/zmq/sockets' require 'celluloid/zmq/version' require 'celluloid/zmq/waker' +require 'celluloid/zmq/curve' + module Celluloid # Actors which run alongside 0MQ sockets module ZMQ diff --git a/lib/celluloid/zmq/curve.rb b/lib/celluloid/zmq/curve.rb new file mode 100644 index 0000000..e97bcae --- /dev/null +++ b/lib/celluloid/zmq/curve.rb @@ -0,0 +1,86 @@ +module Celluloid + module ZMQ + + class Socket + + class << self + + def new_curve(options={}) + socket = new + socket.extend Curve + socket.init options + socket + end + + def new_client(socket) + raise UninitializedError, "No server socket to create clients for." unless socket.server? + new_curve socket.new_client + end + + def new_server(options={}) + new_curve options.merge(:type=>server) + end + end + + end + + module Curve + + attr_accessor :curve + + def init(options) + raise UninitializedError unless options.is_a? Hash + @curve = options.inject({}) { |s,(k,v)| s[k.to_sym] = v; s } + unless @curve[:type] + if @curve[:server_public_key] and !@curve[:server_private_key] + @curve[:type] = :client + else + @curve[:type] = :server + end + end + + Celluloid::Logger.info "CurveZMQ wrapper: #{@curve[:type]}" + + case @curve[:type] + when :server + unless @curve[:server_private_key] + @curve[:server_public_key], @curve[:server_private_key] = generate_keypair + Celluloid::Logger.info "generated keypair for CurveZMQ server wrapper" + Celluloid::Logger.info "public key: #{@curve[:server_public_key]}" + Celluloid::Logger.info "private key: #{@curve[:server_private_key]}" + end + set(::ZMQ::CURVE_SERVER, 1) + set(::ZMQ::CURVE_SECRETKEY, @curve[:server_private_key]) + when :client + raise UninitializedError, "No server public key provided to client." unless @curve[:server_public_key] + unless @curve[:client_public_key] and @curve[:client_private_key] + @curve[:client_public_key], @curve[:client_private_key] = generate_keypair + Celluloid::Logger.info "generated keypair for CurveZMQ client wrapper" + Celluloid::Logger.info "public key: #{@curve[:client_public_key]}" + Celluloid::Logger.info "private key: #{@curve[:client_private_key]}" + end + set(::ZMQ::CURVE_SERVERKEY, @curve[:server_public_key]) + set(::ZMQ::CURVE_PUBLICKEY, @curve[:client_public_key]) + set(::ZMQ::CURVE_SECRETKEY, @curve[:client_private_key]) + else + raise UninitializedError, "No curve socket type specified." + end + + end + + def server? + !@curve[:server_private_key].nil? + end + + def new_client + raise UninitializedError, "No server public key." unless @curve[:server_public_key] + { :server_public_key => @curve[:server_public_key] } + end + + def generate_keypair + ::ZMQ::Util.curve_keypair + end + + end + end +end diff --git a/lib/celluloid/zmq/sockets.rb b/lib/celluloid/zmq/sockets.rb index 34cf969..0b3bd25 100644 --- a/lib/celluloid/zmq/sockets.rb +++ b/lib/celluloid/zmq/sockets.rb @@ -1,6 +1,7 @@ module Celluloid module ZMQ class Socket + # Create a new socket def initialize(type) @socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upcase)