Skip to content

Ensure events send out after being forked #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0-p247
2.1.0
1 change: 1 addition & 0 deletions analytics-ruby.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ Gem::Specification.new do |spec|
spec.add_dependency 'json', ['~> 1.7'] if RUBY_VERSION < "1.9"

spec.add_development_dependency('rake')
spec.add_development_dependency('wrong')
spec.add_development_dependency('rspec')
end
2 changes: 1 addition & 1 deletion lib/segment/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
require 'segment/analytics/utils'
require 'segment/analytics/version'
require 'segment/analytics/client'
require 'segment/analytics/consumer'
require 'segment/analytics/worker'
require 'segment/analytics/request'
require 'segment/analytics/response'
require 'segment/analytics/logging'
Expand Down
43 changes: 24 additions & 19 deletions lib/segment/analytics/client.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'thread'
require 'time'
require 'segment/analytics/utils'
require 'segment/analytics/consumer'
require 'segment/analytics/worker'
require 'segment/analytics/defaults'

module Segment
Expand All @@ -21,27 +21,21 @@ def initialize options = {}
@queue = Queue.new
@write_key = options[:write_key]
@max_queue_size = options[:max_queue_size] || Defaults::Queue::MAX_SIZE
@consumer = Consumer.new @queue, @write_key, options
@thread = ConsumerThread.new { @consumer.run }
@options = options
@worker_mutex = Mutex.new

check_write_key!

at_exit do
# Let the consumer thread know it should exit.
@thread[:should_exit] = true

# Push a flag value to the consumer queue in case it's blocked waiting for a value. This will allow it
# to continue its normal chain of processing, giving it a chance to exit.
@queue << nil
end
at_exit { @worker_thread && @worker_thread[:should_exit] = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need this so the thread gets properly GC'd: #27

end

# public: Synchronously waits until the consumer has flushed the queue.
# public: Synchronously waits until the worker has flushed the queue.
# Use only for scripts which are not long-running, and will
# specifically exit
#
def flush
while !@queue.empty? || @consumer.is_requesting?
while !@queue.empty? || @worker.is_requesting?
ensure_worker_running
sleep(0.1)
end
end
Expand Down Expand Up @@ -263,10 +257,10 @@ def queued_messages
def enqueue(action)
# add our request id for tracing purposes
action[:messageId] = uid

queue_full = @queue.length >= @max_queue_size
@queue << action unless queue_full

unless queue_full = @queue.length >= @max_queue_size
ensure_worker_running
@queue << action
end
!queue_full
end

Expand Down Expand Up @@ -313,8 +307,19 @@ def check_user_id! options
fail ArgumentError, 'Must supply either user_id or anonymous_id' unless options[:user_id] || options[:anonymous_id]
end

# Sub-class thread so we have a named thread (useful for debugging in Thread.list).
class ConsumerThread < Thread
def ensure_worker_running
return if worker_running?
@worker_mutex.synchronize do
return if worker_running?
@worker_thread = Thread.new do
@worker = Worker.new @queue, @write_key, @options
@worker.run
end
end
end

def worker_running?
@worker_thread && @worker_thread.alive?
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

module Segment
class Analytics
class Consumer
class Worker
include Segment::Analytics::Utils
include Segment::Analytics::Defaults

# public: Creates a new consumer
# public: Creates a new worker
#
# The consumer continuously takes messages off the queue
# The worker continuously takes messages off the queue
# and makes requests to the segment.io api
#
# queue - Queue synchronized between client and consumer
# queue - Queue synchronized between client and worker
# write_key - String of the project's Write key
# options - Hash of consumer options
# options - Hash of worker options
# batch_size - Fixnum of how many items to send in a batch
# on_error - Proc of what to do on an error
#
Expand All @@ -26,49 +26,33 @@ def initialize(queue, write_key, options = {})
@write_key = write_key
@batch_size = options[:batch_size] || Queue::BATCH_SIZE
@on_error = options[:on_error] || Proc.new { |status, error| }
@current_batch = []
@mutex = Mutex.new
@batch = []
@lock = Mutex.new
end

# public: Continuously runs the loop to check for new events
#
def run
until Thread.current[:should_exit]
flush
end
end

# public: Flush some events from our queue
#
def flush
# Block until we have something to send
item = @queue.pop
return if item.nil?
return if @queue.empty?

# Synchronize on additions to the current batch
@mutex.synchronize {
@current_batch << item
until @current_batch.length >= @batch_size || @queue.empty?
@current_batch << @queue.pop
@lock.synchronize do
until @batch.length >= @batch_size || @queue.empty?
@batch << @queue.pop
end
end
}

req = Request.new
res = req.post @write_key, @current_batch
@on_error.call res.status, res.error unless res.status == 200
@mutex.synchronize {
@current_batch = []
}
res = Request.new.post @write_key, @batch
@on_error.call res.status, res.error unless res.status == 200

@lock.synchronize { @batch.clear }
end
end

# public: Check whether we have outstanding requests.
#
def is_requesting?
requesting = nil
@mutex.synchronize {
requesting = !@current_batch.empty?
}
requesting
@lock.synchronize { !@batch.empty? }
end
end
end
Expand Down
15 changes: 12 additions & 3 deletions spec/segment/analytics/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class Analytics
describe '#track' do
before(:all) do
@client = Client.new :write_key => WRITE_KEY
@client.instance_variable_get(:@thread).kill
@queue = @client.instance_variable_get :@queue
end

Expand Down Expand Up @@ -69,10 +68,8 @@ class Analytics


describe '#identify' do

before(:all) do
@client = Client.new :write_key => WRITE_KEY
@client.instance_variable_get(:@thread).kill
@queue = @client.instance_variable_get :@queue
end

Expand Down Expand Up @@ -203,6 +200,18 @@ class Analytics
@client.flush
@client.queued_messages.should == 0
end

it 'should complete when the process forks' do
@client.identify Queued::IDENTIFY

Process.fork do
@client.track Queued::TRACK
@client.flush
@client.queued_messages.should == 0
end

Process.wait
end unless defined? JRUBY_VERSION
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

module Segment
class Analytics
describe Consumer do
describe Worker do
describe "#init" do
it 'accepts string keys' do
queue = Queue.new
consumer = Segment::Analytics::Consumer.new(queue, 'secret', 'batch_size' => 100)
consumer.instance_variable_get(:@batch_size).should == 100
worker = Segment::Analytics::Worker.new(queue, 'secret', 'batch_size' => 100)
worker.instance_variable_get(:@batch_size).should == 100
end
end

Expand All @@ -25,8 +25,8 @@ class Analytics

queue = Queue.new
queue << {}
consumer = Segment::Analytics::Consumer.new(queue, 'secret')
consumer.flush
worker = Segment::Analytics::Worker.new(queue, 'secret')
worker.run

queue.should be_empty

Expand All @@ -44,8 +44,8 @@ class Analytics

queue = Queue.new
queue << {}
consumer = Segment::Analytics::Consumer.new queue, 'secret', :on_error => on_error
consumer.flush
worker = Segment::Analytics::Worker.new queue, 'secret', :on_error => on_error
worker.run

Segment::Analytics::Request::any_instance.unstub(:post)

Expand All @@ -62,37 +62,33 @@ class Analytics

queue = Queue.new
queue << Requested::TRACK
consumer = Segment::Analytics::Consumer.new queue, 'testsecret', :on_error => on_error
consumer.flush
worker = Segment::Analytics::Worker.new queue, 'testsecret', :on_error => on_error
worker.run

queue.should be_empty
end
end

describe '#is_requesting?' do

it 'should not return true if there isn\'t a current batch' do

queue = Queue.new
consumer = Segment::Analytics::Consumer.new(queue, 'testsecret')
worker = Segment::Analytics::Worker.new(queue, 'testsecret')

consumer.is_requesting?.should == false
worker.is_requesting?.should == false
end

it 'should return true if there is a current batch' do

queue = Queue.new
queue << Requested::TRACK
consumer = Segment::Analytics::Consumer.new(queue, 'testsecret')
worker = Segment::Analytics::Worker.new(queue, 'testsecret')

Thread.new {
consumer.flush
consumer.is_requesting?.should == false
}
Thread.new do
worker.run
worker.is_requesting?.should == false
end

# sleep barely long enough to let thread flush the queue.
sleep(0.001)
consumer.is_requesting?.should == true
eventually { worker.is_requesting?.should be_true }
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
require 'segment/analytics'
require 'wrong'

include Wrong

module Segment
class Analytics
Expand Down Expand Up @@ -49,7 +52,7 @@ module Queued
SCREEN = SCREEN.merge :user_id => USER_ID
end

# Hashes which are sent from the consumer, camel_cased
# Hashes which are sent from the worker, camel_cased
module Requested
TRACK = TRACK.merge({
:userId => USER_ID,
Expand Down