Skip to content

Commit 047e945

Browse files
committed
Merge pull request yatish27#47 from IntractableQuery/master
Add throttling support to reduce API calls
2 parents b5f7ceb + 8064a83 commit 047e945

File tree

4 files changed

+88
-18
lines changed

4 files changed

+88
-18
lines changed

README.md

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ OR
5050
salesforce = SalesforceBulkApi::Api.new(client)
5151

5252

53-
Sample operations:
53+
### Sample operations:
5454

5555
# Insert/Create
5656
# Add as many fields per record as needed.
@@ -82,21 +82,26 @@ Sample operations:
8282
# Query
8383
res = salesforce.query("Account", "select id, name, createddate from Account limit 3") # We just need to pass the sobject name and the query string
8484

85-
Helpful methods:
85+
### Helpful methods:
8686

8787
# Check status of a job via #job_from_id
8888
job = salesforce.job_from_id('a00A0001009zA2m') # Returns a SalesforceBulkApi::Job instance
8989
puts "status is: #{job.check_job_status.inspect}"
9090

91-
92-
Listening to events:
91+
### Listening to events:
9392

9493
# A job is created
9594
# Useful when you need to store the job_id before any work begins, then if you fail during a complex load scenario, you can wait for your
9695
# previous job(s) to finish.
97-
salesforce.on_job_created do |job|
98-
puts "Job #{job.job_id} created!"
99-
end
96+
salesforce.on_job_created do |job|
97+
puts "Job #{job.job_id} created!"
98+
end
99+
100+
### Throttling API calls:
101+
102+
# By default, this gem (and maybe your app driving it) will query job/batch statuses at an unbounded rate. We
103+
# can fix that, e.g.:
104+
salesforce.connection.set_status_throttle(30) # only check status of individual jobs/batches every 30 seconds
100105

101106
## Installation
102107

lib/salesforce_bulk_api.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
require 'rubygems'
22
require 'bundler'
33
Bundler.require()
4-
require "salesforce_bulk_api/version"
4+
require 'salesforce_bulk_api/version'
55
require 'net/https'
66
require 'xmlsimple'
77
require 'csv'
8+
require 'salesforce_bulk_api/concerns/throttling'
89
require 'salesforce_bulk_api/job'
910
require 'salesforce_bulk_api/connection'
1011

1112
module SalesforceBulkApi
1213

1314
class Api
15+
attr_reader :connection
1416

1517
@@SALESFORCE_API_VERSION = '32.0'
1618

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
module SalesforceBulkApi::Concerns
2+
module Throttling
3+
4+
def throttles
5+
@throttles.dup
6+
end
7+
8+
def add_throttle(&throttling_callback)
9+
@throttles ||= []
10+
@throttles << throttling_callback
11+
end
12+
13+
def set_status_throttle(limit_seconds)
14+
set_throttle_limit_in_seconds(limit_seconds,
15+
throttle_by_keys: %i(http_method path),
16+
only_if: ->(details) { details[:path] == :get })
17+
end
18+
19+
def set_throttle_limit_in_seconds(limit_seconds, throttle_by_keys: %i(http_method path), only_if: Proc.new{true})
20+
add_throttle do |details|
21+
limit_log = get_limit_log(Time.now - limit_seconds)
22+
key = extract_constraint_key_from(details, throttle_by_keys)
23+
last_request = limit_log[key]
24+
25+
if !last_request.nil? && only_if.call(details)
26+
seconds_since_last_request = Time.now.to_f - last_request.to_f
27+
need_to_wait_seconds = limit_seconds - seconds_since_last_request
28+
sleep(need_to_wait_seconds) if need_to_wait_seconds > 0
29+
end
30+
31+
limit_log[key] = Time.now
32+
end
33+
end
34+
35+
private
36+
37+
def extract_constraint_key_from(details, throttle_by_keys)
38+
hash = {}
39+
throttle_by_keys.each { |k| hash[k] = details[k] }
40+
hash
41+
end
42+
43+
def get_limit_log(prune_older_than)
44+
@limits ||= Hash.new(0)
45+
46+
@limits.delete_if do |k, v|
47+
v < prune_older_than
48+
end
49+
50+
@limits
51+
end
52+
53+
def throttle(details={})
54+
(@throttles || []).each do |callback|
55+
args = [details]
56+
args = args[0..callback.arity]
57+
callback.call(*args)
58+
end
59+
end
60+
61+
end
62+
end

lib/salesforce_bulk_api/connection.rb

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module SalesforceBulkApi
22
require 'timeout'
33

44
class Connection
5+
include Concerns::Throttling
56

67
@@XML_HEADER = '<?xml version="1.0" encoding="utf-8" ?>'
78
@@API_VERSION = nil
@@ -35,14 +36,15 @@ def login()
3536
end
3637

3738
def post_xml(host, path, xml, headers)
38-
count :post
3939
host = host || @@INSTANCE_HOST
4040
if host != @@LOGIN_HOST # Not login, need to add session id to header
41-
headers['X-SFDC-Session'] = @session_id;
41+
headers['X-SFDC-Session'] = @session_id
4242
path = "#{@@PATH_PREFIX}#{path}"
4343
end
4444
i = 0
4545
begin
46+
count :post
47+
throttle(http_method: :post, path: path)
4648
https(host).post(path, xml, headers).body
4749
rescue
4850
i += 1
@@ -57,12 +59,14 @@ def post_xml(host, path, xml, headers)
5759
end
5860

5961
def get_request(host, path, headers)
60-
count :get
6162
host = host || @@INSTANCE_HOST
6263
path = "#{@@PATH_PREFIX}#{path}"
6364
if host != @@LOGIN_HOST # Not login, need to add session id to header
6465
headers['X-SFDC-Session'] = @session_id;
6566
end
67+
68+
count :get
69+
throttle(http_method: :get, path: path)
6670
https(host).get(path, headers).body
6771
end
6872

@@ -74,23 +78,20 @@ def https(host)
7478
end
7579

7680
def parse_instance()
77-
@instance=@server_url.match(/https:\/\/[a-z]{2}[0-9]{1,2}/).to_s.gsub("https://","")
81+
@instance = @server_url.match(/https:\/\/[a-z]{2}[0-9]{1,2}/).to_s.gsub("https://","")
7882
@instance = @server_url.split(".salesforce.com")[0].split("://")[1] if @instance.nil? || @instance.empty?
7983
return @instance
8084
end
8185

8286
def counters
83-
{
84-
get: @counters[:get],
85-
post: @counters[:post]
86-
}
87+
@counters.dup
8788
end
8889

8990
private
9091

91-
def count(name)
92+
def count(http_method)
9293
@counters ||= Hash.new(0)
93-
@counters[name] += 1
94+
@counters[http_method] += 1
9495
end
9596

9697
end

0 commit comments

Comments
 (0)