From c6a4cf65598caa5c4dd5d9cf958b9403bceb8055 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Wed, 29 Jan 2014 17:31:18 +0100 Subject: [PATCH 1/9] Basic spec --- spec/celluloid/zmq_spec.rb | 42 ++++++++++++++++++++++++++++++++++++++ spec/spec_helper.rb | 6 ++++++ 2 files changed, 48 insertions(+) create mode 100644 spec/celluloid/zmq_spec.rb diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb new file mode 100644 index 0000000..72b5860 --- /dev/null +++ b/spec/celluloid/zmq_spec.rb @@ -0,0 +1,42 @@ +require 'spec_helper' + +# find some available ports for ZMQ +ZMQ_PORTS = 10.times.map do + begin + server = TCPServer.new('127.0.0.1', 0) + server.addr[1] + ensure + server.close if server + end +end + +describe Celluloid::ZMQ do + before { @sockets = [] } + after { @sockets.each(&:close) } + let(:ports) { ZMQ_PORTS } + + def connect(socket, index=0) + socket.connect("tcp://127.0.0.1:#{ports[index]}") + @sockets << socket + socket + end + + def bind(socket, index=0) + socket.bind("tcp://127.0.0.1:#{ports[index]}") + @sockets << socket + socket + end + + describe ".init" do + it "inits a ZMQ context" do + Celluloid::ZMQ.init + server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) + client = connect(Celluloid::ZMQ.context.socket(::ZMQ::REP)) + + server.send_string("hello world") + message = "" + client.recv_string(message) + message.should eq("hello world") + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index e66d9f1..a253ab4 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -15,4 +15,10 @@ ex.run Celluloid.shutdown end + + config.around do |ex| + Celluloid::ZMQ.init(1) unless example.metadata[:no_init] + ex.run + Celluloid::ZMQ.terminate + end end From 56478ba77cd5da5c2148c1e7b59cf2ac235a36a2 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Wed, 29 Jan 2014 17:38:39 +0100 Subject: [PATCH 2/9] Raise an error when Celluloid::ZMQ hasn't been initialized, instead of lazily initializing init Lazy initialization is not thread safe. If the user forgets to call `init`, we want to fail fast instead of possibly creating thread safety problems. --- lib/celluloid/zmq.rb | 15 ++++++++++----- spec/celluloid/zmq_spec.rb | 13 ++++++++++++- spec/spec_helper.rb | 2 ++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/lib/celluloid/zmq.rb b/lib/celluloid/zmq.rb index 0caeaa9..ccac7fa 100644 --- a/lib/celluloid/zmq.rb +++ b/lib/celluloid/zmq.rb @@ -10,9 +10,9 @@ module Celluloid # Actors which run alongside 0MQ sockets module ZMQ - class << self - attr_writer :context + UninitializedError = Class.new StandardError + class << self # Included hook to pull in Celluloid def included(klass) klass.send :include, ::Celluloid @@ -20,14 +20,19 @@ def included(klass) end # Obtain a 0MQ context (or lazily initialize it) - def context(worker_threads = 1) + def init(worker_threads = 1) return @context if @context @context = ::ZMQ::Context.new(worker_threads) end - alias_method :init, :context + + def context + raise UninitializedError, "you must initialize Celluloid::ZMQ by calling Celluloid::ZMQ.init" unless @context + @context + end def terminate - @context.terminate + @context.terminate if @context + @context = nil end end diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb index 72b5860..77663ae 100644 --- a/spec/celluloid/zmq_spec.rb +++ b/spec/celluloid/zmq_spec.rb @@ -28,7 +28,7 @@ def bind(socket, index=0) end describe ".init" do - it "inits a ZMQ context" do + it "inits a ZMQ context", :no_init do Celluloid::ZMQ.init server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) client = connect(Celluloid::ZMQ.context.socket(::ZMQ::REP)) @@ -38,5 +38,16 @@ def bind(socket, index=0) client.recv_string(message) message.should eq("hello world") end + + it "raises an error when trying to access context and it isn't initialized", :no_init do + expect { Celluloid::ZMQ.context }.to raise_error(Celluloid::ZMQ::UninitializedError) + end + + it "raises an error when trying to access context after it is terminated" do + Celluloid::ZMQ.terminate + expect { Celluloid::ZMQ.context }.to raise_error(Celluloid::ZMQ::UninitializedError) + Celluloid::ZMQ.init + Celluloid::ZMQ.context.should_not be_nil + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index a253ab4..f14e925 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -10,6 +10,8 @@ Celluloid.shutdown_timeout = 1 RSpec.configure do |config| + config.treat_symbols_as_metadata_keys_with_true_values = true + config.around do |ex| Celluloid.boot ex.run From abb1fd54c20a4403787e6b7ad442e1585692338f Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Wed, 29 Jan 2014 17:42:15 +0100 Subject: [PATCH 3/9] Fix docs --- lib/celluloid/zmq.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/celluloid/zmq.rb b/lib/celluloid/zmq.rb index ccac7fa..e4c0d64 100644 --- a/lib/celluloid/zmq.rb +++ b/lib/celluloid/zmq.rb @@ -19,7 +19,7 @@ def included(klass) klass.mailbox_class Celluloid::ZMQ::Mailbox end - # Obtain a 0MQ context (or lazily initialize it) + # Obtain a 0MQ context def init(worker_threads = 1) return @context if @context @context = ::ZMQ::Context.new(worker_threads) From 955fbaaf1318311b46de5920feb3e92b4afc6bf8 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Thu, 30 Jan 2014 09:45:15 +0100 Subject: [PATCH 4/9] Add a basic spec for testing a REP socket --- spec/celluloid/zmq_spec.rb | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb index 77663ae..6ffc4a1 100644 --- a/spec/celluloid/zmq_spec.rb +++ b/spec/celluloid/zmq_spec.rb @@ -50,4 +50,33 @@ def bind(socket, index=0) Celluloid::ZMQ.context.should_not be_nil end end + + describe Celluloid::ZMQ::RepSocket do + it "receives messages" do + actor = Class.new do + include Celluloid::ZMQ + + finalizer :close_socket + + def initialize(port) + @socket = Celluloid::ZMQ::RepSocket.new + @socket.connect("tcp://127.0.0.1:#{port}") + end + + def fetch + @socket.read + end + + def close_socket + @socket.close + end + end + server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) + client = actor.new(ports[0]) + + server.send_string("hello world") + result = client.fetch + result.should eq("hello world") + end + end end From e9a6e53e9c4d54791a5e3ba3218f01f05676b124 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Thu, 30 Jan 2014 10:20:19 +0100 Subject: [PATCH 5/9] Make sure waker is shut down. Fixes mailbox specs Requires celluloid/celluloid#376 --- lib/celluloid/zmq/waker.rb | 1 + spec/spec_helper.rb | 6 +----- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/celluloid/zmq/waker.rb b/lib/celluloid/zmq/waker.rb index bf31559..ed16b39 100644 --- a/lib/celluloid/zmq/waker.rb +++ b/lib/celluloid/zmq/waker.rb @@ -50,6 +50,7 @@ def cleanup @receiver.close rescue nil nil end + alias_method :shutdown, :cleanup end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index f14e925..8940cf4 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,14 +13,10 @@ config.treat_symbols_as_metadata_keys_with_true_values = true config.around do |ex| + Celluloid::ZMQ.init(1) unless example.metadata[:no_init] Celluloid.boot ex.run Celluloid.shutdown - end - - config.around do |ex| - Celluloid::ZMQ.init(1) unless example.metadata[:no_init] - ex.run Celluloid::ZMQ.terminate end end From fdbfbcad592ff058e3314689d19b90dbaa6b07e6 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Thu, 30 Jan 2014 10:21:04 +0100 Subject: [PATCH 6/9] Add a spec which checks pipelining of tasks when using ZMQ sockets --- spec/celluloid/zmq_spec.rb | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb index 6ffc4a1..6a1ddad 100644 --- a/spec/celluloid/zmq_spec.rb +++ b/spec/celluloid/zmq_spec.rb @@ -52,8 +52,8 @@ def bind(socket, index=0) end describe Celluloid::ZMQ::RepSocket do - it "receives messages" do - actor = Class.new do + let(:actor) do + Class.new do include Celluloid::ZMQ finalizer :close_socket @@ -63,6 +63,10 @@ def initialize(port) @socket.connect("tcp://127.0.0.1:#{port}") end + def say_hi + "Hi!" + end + def fetch @socket.read end @@ -71,6 +75,9 @@ def close_socket @socket.close end end + end + + it "receives messages" do server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) client = actor.new(ports[0]) @@ -78,5 +85,15 @@ def close_socket result = client.fetch result.should eq("hello world") end + + it "suspends actor while waiting for message" do + server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) + client = actor.new(ports[0]) + + result = client.future.fetch + client.say_hi.should eq("Hi!") + server.send_string("hello world") + result.value.should eq("hello world") + end end end From df09da23c4f39cf650fefff3a2bf7f32211187f8 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Thu, 30 Jan 2014 10:39:59 +0100 Subject: [PATCH 7/9] Re-add option to set context manually --- lib/celluloid/zmq.rb | 2 ++ spec/celluloid/zmq_spec.rb | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/lib/celluloid/zmq.rb b/lib/celluloid/zmq.rb index e4c0d64..6884340 100644 --- a/lib/celluloid/zmq.rb +++ b/lib/celluloid/zmq.rb @@ -13,6 +13,8 @@ module ZMQ UninitializedError = Class.new StandardError class << self + attr_writer :context + # Included hook to pull in Celluloid def included(klass) klass.send :include, ::Celluloid diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb index 6a1ddad..a7344f7 100644 --- a/spec/celluloid/zmq_spec.rb +++ b/spec/celluloid/zmq_spec.rb @@ -39,6 +39,16 @@ def bind(socket, index=0) message.should eq("hello world") end + it "can set ZMQ context manually", :no_init do + context = ::ZMQ::Context.new(1) + begin + Celluloid::ZMQ.context = context + Celluloid::ZMQ.context.should eq(context) + ensure + context.terminate + end + end + it "raises an error when trying to access context and it isn't initialized", :no_init do expect { Celluloid::ZMQ.context }.to raise_error(Celluloid::ZMQ::UninitializedError) end From d6867cdac94cc76a65253d6567171ecae67ba7c4 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Thu, 30 Jan 2014 17:44:46 +0100 Subject: [PATCH 8/9] Add spec which tests writing to sockets --- spec/celluloid/zmq_spec.rb | 54 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb index a7344f7..1ceb812 100644 --- a/spec/celluloid/zmq_spec.rb +++ b/spec/celluloid/zmq_spec.rb @@ -106,4 +106,58 @@ def close_socket result.value.should eq("hello world") end end + + describe Celluloid::ZMQ::ReqSocket do + let(:actor) do + Class.new do + include Celluloid::ZMQ + + finalizer :close_socket + + def initialize(port) + @socket = Celluloid::ZMQ::ReqSocket.new + @socket.connect("tcp://127.0.0.1:#{port}") + end + + def say_hi + "Hi!" + end + + def send(message) + @socket.write(message) + true + end + + def close_socket + @socket.close + end + end + end + + it "sends messages" do + client = bind(Celluloid::ZMQ.context.socket(::ZMQ::REP)) + server = actor.new(ports[0]) + + server.send("hello world") + + message = "" + client.recv_string(message) + message.should eq("hello world") + end + + it "suspends actor while waiting for message to be sent" do + client = bind(Celluloid::ZMQ.context.socket(::ZMQ::REP)) + server = actor.new(ports[0]) + + result = server.future.send("hello world") + + server.say_hi.should eq("Hi!") + + message = "" + client.recv_string(message) + message.should eq("hello world") + + result.value.should be_true + end + end end From 3c416f1d922c487f8a0e86824943fafd55f4fa92 Mon Sep 17 00:00:00 2001 From: Jonas Nicklas Date: Tue, 4 Feb 2014 09:54:38 +0100 Subject: [PATCH 9/9] Use inproc instead of TCP, so we don't have to mess around with open ports --- spec/celluloid/zmq_spec.rb | 31 ++++++++++--------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/spec/celluloid/zmq_spec.rb b/spec/celluloid/zmq_spec.rb index 1ceb812..4f8e276 100644 --- a/spec/celluloid/zmq_spec.rb +++ b/spec/celluloid/zmq_spec.rb @@ -1,28 +1,17 @@ require 'spec_helper' -# find some available ports for ZMQ -ZMQ_PORTS = 10.times.map do - begin - server = TCPServer.new('127.0.0.1', 0) - server.addr[1] - ensure - server.close if server - end -end - describe Celluloid::ZMQ do before { @sockets = [] } after { @sockets.each(&:close) } - let(:ports) { ZMQ_PORTS } def connect(socket, index=0) - socket.connect("tcp://127.0.0.1:#{ports[index]}") + socket.connect("inproc://celluloid-spec-#{index}") @sockets << socket socket end def bind(socket, index=0) - socket.bind("tcp://127.0.0.1:#{ports[index]}") + socket.bind("inproc://celluloid-spec-#{index}") @sockets << socket socket end @@ -68,9 +57,9 @@ def bind(socket, index=0) finalizer :close_socket - def initialize(port) + def initialize(index) @socket = Celluloid::ZMQ::RepSocket.new - @socket.connect("tcp://127.0.0.1:#{port}") + @socket.connect("inproc://celluloid-spec-#{index}") end def say_hi @@ -89,7 +78,7 @@ def close_socket it "receives messages" do server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) - client = actor.new(ports[0]) + client = actor.new(0) server.send_string("hello world") result = client.fetch @@ -98,7 +87,7 @@ def close_socket it "suspends actor while waiting for message" do server = bind(Celluloid::ZMQ.context.socket(::ZMQ::REQ)) - client = actor.new(ports[0]) + client = actor.new(0) result = client.future.fetch client.say_hi.should eq("Hi!") @@ -114,9 +103,9 @@ def close_socket finalizer :close_socket - def initialize(port) + def initialize(index) @socket = Celluloid::ZMQ::ReqSocket.new - @socket.connect("tcp://127.0.0.1:#{port}") + @socket.connect("inproc://celluloid-spec-#{index}") end def say_hi @@ -136,7 +125,7 @@ def close_socket it "sends messages" do client = bind(Celluloid::ZMQ.context.socket(::ZMQ::REP)) - server = actor.new(ports[0]) + server = actor.new(0) server.send("hello world") @@ -147,7 +136,7 @@ def close_socket it "suspends actor while waiting for message to be sent" do client = bind(Celluloid::ZMQ.context.socket(::ZMQ::REP)) - server = actor.new(ports[0]) + server = actor.new(0) result = server.future.send("hello world")