Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.17.0 over naimster update first? #50

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env-ci
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CELLULOID_SPECS_LOG_STRATEGY=stderr
CELLULOID_SPECS_LOG_LEVEL=3
CELLULOID_SPECS_LOG_FILE=log/ci.log
CELLULOID_SPECS_LOG_SYNC=false
4 changes: 4 additions & 0 deletions .env-dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CELLULOID_SPECS_LOG_STRATEGY=single
CELLULOID_SPECS_LOG_FILE=log/test.log
CELLULOID_SPECS_LOG_LEVEL=0
CELLULOID_SPECS_LOG_SYNC=true
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "culture"]
path = culture
url = http://github.com/celluloid/culture.git
2 changes: 2 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
--color
--tty
--format documentation
--backtrace
--order random
--require spec_helper
--warnings
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rvm:
- rbx-2

matrix:
fast_finish: true
allow_failures:
- rvm: jruby
- rvm: rbx-2
Expand Down
22 changes: 16 additions & 6 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
0.70.0
-----
* Adapted to be compliant with version 0.17.0 of Celluloid.

0.16.1 (2015-04-26)
-----
* Support for XPUB sockets
* Support for reading multipart messages
* Spec cleanup

0.16.0 (2014-09-04)
-------------------
-----
* Support for setting socket options
* More specs

0.15.0 (2013-09-04)
-------------------
-----
* Tracking release for Celluloid 0.15

0.14.0 (2013-05-07)
-------------------
-----
* Add pubsub example
* Add identity support to Sockets
* Depend on EventedMailbox from core instead of celluloid-io
* Remove overhead for IO waiting by calling directly to the reactor

0.13.0
------
-----
* Feature: Support for DealerSocket and RouterSocket
* Support for the #more_parts? method on sockets
* Celluloid 0.13 compatibility fixes

0.12.0
------
-----
* Tracking release for Celluloid 0.12.0

0.10.0
------
-----
* Factor celluloid-zmq into its own gem
* #linger= support

Expand Down
7 changes: 2 additions & 5 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
source 'http://rubygems.org'
require File.expand_path("../culture/sync", __FILE__)
Celluloid::Sync::Gemfile[self]

gem 'coveralls', require: false
gem 'celluloid', github: 'celluloid/celluloid', branch: 'master'

# Specify your gem's dependencies in celluloid-zmq.gemspec
gemspec
8 changes: 3 additions & 5 deletions celluloid-zmq.gemspec
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
# -*- encoding: utf-8 -*-
require File.expand_path('../lib/celluloid/zmq/version', __FILE__)
require File.expand_path("../culture/sync", __FILE__)

Gem::Specification.new do |gem|
gem.authors = ["Tony Arcieri"]
gem.email = ["tony.arcieri@gmail.com"]
gem.description = "Celluloid bindings to the ffi-rzmq library"
gem.summary = "Celluloid::ZMQ provides concurrent Celluloid actors that can listen for 0MQ events"
gem.homepage = "http://github.com/celluloid/celluloid-zmq"
gem.license = "MIT"

gem.name = "celluloid-zmq"
gem.version = Celluloid::ZMQ::VERSION

gem.add_dependency "celluloid", ">= 0.16.0"
Celluloid::Sync::Gemspec[gem]
gem.add_dependency "ffi"
gem.add_dependency "ffi-rzmq"

gem.add_development_dependency "rake"
gem.add_development_dependency "rspec", "~> 2.14.0"

# Files
ignores = File.read(".gitignore").split(/\r?\n/).reject{ |f| f =~ /^(#.+|\s*)$/ }.map {|f| Dir[f] }.flatten
gem.files = (Dir['**/*','.gitignore'] - ignores).reject {|f| !File.file?(f) }
Expand Down
1 change: 1 addition & 0 deletions culture
Submodule culture added at e1414b
14 changes: 8 additions & 6 deletions examples/publish_subscribe.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
require 'celluloid/zmq'
require 'celluloid/zmq/current'

Celluloid::ZMQ.init

class PublishSubscribe
include Celluloid::ZMQ

def run
link = "tcp://127.0.0.1:5555"

s1 = PubSocket.new
s2 = SubSocket.new
s3 = SubSocket.new
s4 = SubSocket.new
s5 = SubSocket.new
s1 = Socket::Pub.new
s2 = Socket::Sub.new
s3 = Socket::Sub.new
s4 = Socket::Sub.new
s5 = Socket::Sub.new

s1.linger = 100
s2.subscribe('') # receive all
Expand Down
16 changes: 12 additions & 4 deletions lib/celluloid/zmq.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
require 'ffi-rzmq'

require 'celluloid'
$CELLULOID_ZMQ_BACKPORTED = (ENV["CELLULOID_ZMQ_BACKPORTED"] != "false") unless defined?($CELLULOID_ZMQ_BACKPORTED)

require ($CELLULOID_ZMQ_BACKPORTED) ? 'celluloid' : 'celluloid/current'

require 'celluloid/zmq/mailbox'
require 'celluloid/zmq/reactor'
require 'celluloid/zmq/sockets'
require 'celluloid/zmq/socket'
require 'celluloid/zmq/version'
require 'celluloid/zmq/waker'

require 'celluloid/zmq/socket/readable'
require 'celluloid/zmq/socket/writable'
require 'celluloid/zmq/socket/types'

module Celluloid
# Actors which run alongside 0MQ sockets
module ZMQ
Expand All @@ -23,8 +30,7 @@ def included(klass)

# Obtain a 0MQ context
def init(worker_threads = 1)
return @context if @context
@context = ::ZMQ::Context.new(worker_threads)
@context ||= ::ZMQ::Context.new(worker_threads)
end

def context
Expand Down Expand Up @@ -68,3 +74,5 @@ def wait_writable(socket)

end
end

require 'celluloid/zmq/deprecate' unless $CELLULOID_BACKPORTED == false || $CELLULOID_ZMQ_BACKPORTED == false
2 changes: 2 additions & 0 deletions lib/celluloid/zmq/current.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
$CELLULOID_ZMQ_BACKPORTED = false
require "celluloid/zmq"
15 changes: 15 additions & 0 deletions lib/celluloid/zmq/deprecate.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module Celluloid
module ZMQ
ReadableSocket = Socket::Readable
WritableSocket = Socket::Writable
RepSocket = Socket::Rep
ReqSocket = Socket::Req
DealerSocket = Socket::Dealer
RouterSocket = Socket::Router
PushSocket = Socket::Push
PullSocket = Socket::Pull
PubSocket = Socket::Pub
XPubSocket = Socket::XPub
SubSocket = Socket::Sub
end
end
2 changes: 1 addition & 1 deletion lib/celluloid/zmq/mailbox.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Celluloid
module ZMQ
# Replacement mailbox for Celluloid::ZMQ actors
class Mailbox < Celluloid::EventedMailbox
class Mailbox < Celluloid::Mailbox::Evented
def initialize
super(Reactor)
end
Expand Down
69 changes: 69 additions & 0 deletions lib/celluloid/zmq/socket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
module Celluloid
module ZMQ
class Socket
# Create a new socket
def initialize(type)
@socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upcase)
@linger = 0
end
attr_reader :linger

# Connect to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def connect(addr)
unless ::ZMQ::Util.resultcode_ok? @socket.connect addr
raise IOError, "error connecting to #{addr}: #{::ZMQ::Util.error_string}"
end
true
end

def linger=(value)
@linger = value || -1

unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(::ZMQ::LINGER, value)
raise IOError, "couldn't set linger: #{::ZMQ::Util.error_string}"
end
end

def identity=(value)
@socket.identity = value
end

def identity
@socket.identity
end

def set(option, value, length = nil)
unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(option, value, length)
raise IOError, "couldn't set value for option #{option}: #{::ZMQ::Util.error_string}"
end
end

def get(option)
option_value = []

unless ::ZMQ::Util.resultcode_ok? @socket.getsockopt(option, option_value)
raise IOError, "couldn't get value for option #{option}: #{::ZMQ::Util.error_string}"
end

option_value[0]
end

# Bind to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def bind(addr)
unless ::ZMQ::Util.resultcode_ok? @socket.bind(addr)
raise IOError, "couldn't bind to #{addr}: #{::ZMQ::Util.error_string}"
end
end

# Close the socket
def close
@socket.close
end

# Hide ffi-rzmq internals
alias_method :inspect, :to_s
end
end
end
45 changes: 45 additions & 0 deletions lib/celluloid/zmq/socket/readable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module Celluloid
module ZMQ
class Socket
# Readable 0MQ sockets have a read method
module Readable
extend Forwardable

# always set LINGER on readable sockets
def bind(addr)
self.linger = @linger
super(addr)
end

def connect(addr)
self.linger = @linger
super(addr)
end

# Read a message from the socket
def read(buffer = '')
ZMQ.wait_readable(@socket) if ZMQ.evented?

unless ::ZMQ::Util.resultcode_ok? @socket.recv_string buffer
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
end
buffer
end

# Multiparts message ?
def_delegator :@socket, :more_parts?

# Reads a multipart message, stores it into the given buffer and returns
# the buffer.
def read_multipart(buffer = [])
ZMQ.wait_readable(@socket) if ZMQ.evented?

unless ::ZMQ::Util.resultcode_ok? @socket.recv_strings buffer
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
end
buffer
end
end
end
end
end
Loading