Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ def initialize(config,
@filter_adapter = filter_adapter
@sender_adapter = sender_adapter
@cache = cache
@cache_max_size = config.unique_keys_cache_max_size
@max_bulk_size = config.unique_keys_bulk_size
@semaphore = Mutex.new
@keys_size = 0
end

def call
Expand All @@ -30,8 +30,9 @@ def track(feature_name, key)
@filter_adapter.add(feature_name, key)

add_or_update(feature_name, key)
@keys_size += 1

send_bulk_data if @cache.size >= @cache_max_size
send_bulk_data if @keys_size >= @max_bulk_size

true
rescue StandardError => e
Expand Down Expand Up @@ -75,14 +76,19 @@ def send_bulk_data
return if @cache.empty?

uniques = @cache.clone
keys_size = @keys_size
@cache.clear
@keys_size = 0

if uniques.size <= @max_bulk_size
if keys_size <= @max_bulk_size
@sender_adapter.record_uniques_key(uniques)
return
end

bulks = SplitIoClient::Utilities.split_bulk_to_send(uniques, uniques.size / @max_bulk_size)
bulks = []
uniques.each do |unique|
bulks += check_keys_and_split_to_bulks(unique)
end

bulks.each do |b|
@sender_adapter.record_uniques_key(b)
Expand All @@ -91,6 +97,23 @@ def send_bulk_data
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

def check_keys_and_split_to_bulks(unique)
unique_updated = []
unique.each do |_, value|
if value.size > @max_bulk_size
sub_bulks = SplitIoClient::Utilities.split_bulk_to_send(value, value.size / @max_bulk_size)
sub_bulks.each do |sub_bulk|
unique_updated.add({ key: sub_bulk })
end
break

end
unique_updated.add({ key: value })
end

unique_updated
end
end
end
end
Expand Down
10 changes: 5 additions & 5 deletions lib/splitclient-rb/split_config.rb
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just remove the commented out lines in this file?

Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def initialize(opts = {})
@telemetry_service_url = opts[:telemetry_service_url] || SplitConfig.default_telemetry_service_url

@unique_keys_refresh_rate = SplitConfig.default_unique_keys_refresh_rate(@cache_adapter)
@unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size
# @unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size
@unique_keys_bulk_size = SplitConfig.default_unique_keys_bulk_size(@cache_adapter)

@counter_refresh_rate = SplitConfig.default_counter_refresh_rate(@cache_adapter)
Expand Down Expand Up @@ -292,7 +292,7 @@ def initialize(opts = {})
attr_accessor :on_demand_fetch_max_retries

attr_accessor :unique_keys_refresh_rate
attr_accessor :unique_keys_cache_max_size
#attr_accessor :unique_keys_cache_max_size
attr_accessor :unique_keys_bulk_size

attr_accessor :counter_refresh_rate
Expand Down Expand Up @@ -498,9 +498,9 @@ def self.default_unique_keys_refresh_rate(adapter)
900
end

def self.default_unique_keys_cache_max_size
30000
end
# def self.default_unique_keys_cache_max_size
# 30000
# end

def self.default_unique_keys_bulk_size(adapter)
return 2000 if adapter == :redis
Expand Down
54 changes: 42 additions & 12 deletions spec/engine/impressions/memory_unique_keys_tracker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
it 'track - full cache and send bulk' do
post_url = 'https://telemetry.split.io/api/v1/keys/ss'
body_expect = {
keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }, { f: 'feature-test-1', ks: ['key_test-1'] }]
keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }]
}.to_json

body_expect2 = {
keys: [{ f: 'feature-test-1', ks: ['key_test-1', 'key_test-2'] }]
}.to_json

stub_request(:post, post_url).with(body: body_expect).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '')

cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 2
config.unique_keys_bulk_size = 2
tracker = subject.new(config, filter_adapter, sender_adapter, cache)

Expand All @@ -36,36 +40,61 @@
expect(tracker.track("feature-test-#{i}", 'key_test-2')).to eq(true)
end

expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect)).to have_been_made

cache.clear
end

it 'track - full cache and send 2 bulks' do
it 'track - full cache and send 4 bulks' do
post_url = 'https://telemetry.split.io/api/v1/keys/ss'
body_expect1 = {
keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }, { f: 'feature-test-2', ks: ['key-1', 'key-2'] }]
keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }]
}.to_json

body_expect2 = {
keys: [{ f: 'feature-test-1', ks: ['key-1', 'key-2'] }, { f: 'feature-test-3', ks: ['key-1'] }]
keys: [{ f: 'feature-test-0', ks: ['key-3'] }, { f: 'feature-test-1', ks: ['key-1'] }]
}.to_json

body_expect3 = {
keys: [{ f: 'feature-test-1', ks: ['key-2', 'key-3'] }]
}.to_json

body_expect4 = {
keys: [{ f: 'feature-test-2', ks: ['key-1', 'key-2'] }]
}.to_json

body_expect5 = {
keys: [{ f: 'feature-test-2', ks: ['key-3'] }, { f: 'feature-test-3', ks: ['key-1'] }]
}.to_json

body_expect6 = {
keys: [{ f: 'feature-test-3', ks: ['key-2', 'key-3'] }]
}.to_json

stub_request(:post, post_url).with(body: body_expect1).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect3).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect4).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect5).to_return(status: 200, body: '')
stub_request(:post, post_url).with(body: body_expect6).to_return(status: 200, body: '')

cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 4
config.unique_keys_bulk_size = 2
tracker = subject.new(config, filter_adapter, sender_adapter, cache)

4.times do |i|
expect(tracker.track("feature-test-#{i}", 'key-1')).to eq(true)
expect(tracker.track("feature-test-#{i}", 'key-2')).to eq(true)
expect(tracker.track("feature-test-#{i}", 'key-3')).to eq(true)
end

expect(a_request(:post, post_url).with(body: body_expect1)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect3)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect4)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect5)).to have_been_made
expect(a_request(:post, post_url).with(body: body_expect6)).to have_been_made

cache.clear
end
Expand All @@ -74,9 +103,8 @@
context 'with sender_adapter_test' do
let(:sender_adapter_test) { MemoryUniqueKeysSenderTest.new }

it 'track - should add elemets to cache' do
it 'track - should trigger send when bulk size reached and add elemets to cache' do
cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 5
config.unique_keys_bulk_size = 5
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)

Expand All @@ -85,24 +113,26 @@
expect(tracker.track('feature_name_test', 'key_test-1')).to eq(true)
expect(tracker.track('feature_name_test', 'key_test-2')).to eq(true)
expect(tracker.track('other_test', 'key_test-2')).to eq(true)
expect(tracker.track('other_test', 'key_test-35')).to eq(true)

expect(cache.size).to eq(2)
expect(tracker.instance_variable_get(:@keys_size)).to eq(4)

expect(cache['feature_name_test'].include?('key_test')).to eq(true)
expect(cache['feature_name_test'].include?('key_test-1')).to eq(true)
expect(cache['feature_name_test'].include?('key_test-2')).to eq(true)
expect(cache['feature_name_test'].include?('key_test-35')).to eq(false)

expect(cache['other_test'].include?('key_test-2')).to eq(true)
expect(cache['other_test'].include?('key_test-35')).to eq(true)
expect(cache['other_test'].include?('key_test-1')).to eq(false)

expect(tracker.track('other_test', 'key_test-35')).to eq(true)
expect(cache.size).to eq(0)
expect(tracker.instance_variable_get(:@keys_size)).to eq(0)

cache.clear
end

it 'track - full cache and send bulk' do
cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 10
config.unique_keys_bulk_size = 5
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
key = "#{config.redis_namespace}.uniquekeys"

cache = Concurrent::Hash.new
config.unique_keys_cache_max_size = 20
config.unique_keys_bulk_size = 2
tracker = subject.new(config, filter_adapter, sender_adapter, cache)

Expand Down