Skip to content

Commit 4422533

Browse files
authored
Merge pull request #3170 from sap-contributions/bbs-failover
Use Net::HTTP::Persistent in Diego client
2 parents 06e382d + 6859ebe commit 4422533

File tree

14 files changed

+206
-209
lines changed

14 files changed

+206
-209
lines changed

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ gem 'membrane', '~> 1.0'
2121
gem 'mime-types', '~> 3.4'
2222
gem 'multi_json'
2323
gem 'multipart-parser'
24+
gem 'net-http-persistent'
2425
gem 'net-ssh'
2526
gem 'netaddr', '>= 2.0.4'
2627
gem 'newrelic_rpm'

Gemfile.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ GEM
120120
simplecov (<= 0.13)
121121
coderay (1.1.3)
122122
concurrent-ruby (1.2.0)
123+
connection_pool (2.3.0)
123124
cookiejar (0.3.3)
124125
crack (0.4.5)
125126
rexml
@@ -326,6 +327,8 @@ GEM
326327
mustermann (3.0.0)
327328
ruby2_keywords (~> 0.0.1)
328329
mysql2 (0.5.5)
330+
net-http-persistent (4.0.1)
331+
connection_pool (~> 2.2)
329332
net-ssh (7.0.1)
330333
netaddr (2.0.6)
331334
netrc (0.11.0)
@@ -584,6 +587,7 @@ DEPENDENCIES
584587
multi_json
585588
multipart-parser
586589
mysql2 (~> 0.5.5)
590+
net-http-persistent
587591
net-ssh
588592
netaddr (>= 2.0.4)
589593
newrelic_rpm

lib/cloud_controller/deployment_updater/dispatcher.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def dispatch
1515
deployments_to_cancel = DeploymentModel.where(state: DeploymentModel::CANCELING_STATE).all
1616

1717
begin
18-
workpool = WorkPool.new(50)
18+
workpool = WorkPool.new
1919

2020
logger.info("scaling #{deployments_to_scale.size} deployments")
2121
deployments_to_scale.each do |deployment|

lib/cloud_controller/diego/processes_sync.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class BBSFetchError < Error
1212

1313
def initialize(config:, statsd_updater: VCAP::CloudController::Metrics::StatsdUpdater.new)
1414
@config = config
15-
@workpool = WorkPool.new(50, store_exceptions: true)
15+
@workpool = WorkPool.new(store_exceptions: true)
1616
@statsd_updater = statsd_updater
1717
end
1818

lib/cloud_controller/diego/reporters/instances_reporter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def initialize(bbs_instances_client)
1414
end
1515

1616
def self.singleton_workpool
17-
@singleton_workpool ||= WorkPool.new(50)
17+
@singleton_workpool ||= WorkPool.new
1818
end
1919

2020
def all_instances_for_app(process)

lib/cloud_controller/diego/tasks_sync.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class BBSFetchError < Error
1010

1111
def initialize(config:)
1212
@config = config
13-
@workpool = WorkPool.new(50, store_exceptions: true)
13+
@workpool = WorkPool.new(store_exceptions: true)
1414
end
1515

1616
def sync

lib/diego/client.rb

Lines changed: 68 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
require 'diego/bbs/bbs'
22
require 'diego/errors'
33
require 'diego/routes'
4+
require 'net/http/persistent'
5+
require 'uri'
46

57
module Diego
68
class Client
7-
PROTOBUF_HEADER = { 'Content-Type'.freeze => 'application/x-protobuf'.freeze }.freeze
8-
99
def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
1010
connect_timeout:, send_timeout:, receive_timeout:)
1111
ENV['PB_IGNORE_DEPRECATIONS'] ||= 'true'
12-
@client = build_client(
13-
url,
12+
@bbs_url = URI(url)
13+
@http_client = new_http_client(
1414
ca_cert_file,
1515
client_cert_file,
1616
client_key_file,
@@ -20,157 +20,121 @@ def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
2020
end
2121

2222
def ping
23-
response = with_request_error_handling do
24-
client.post(Routes::PING)
25-
end
23+
req = post_request(path: Routes::PING)
24+
response = request_with_error_handling(req)
2625

27-
validate_status!(response: response, statuses: [200])
26+
validate_status!(response)
2827
protobuf_decode!(response.body, Bbs::Models::PingResponse)
2928
end
3029

3130
def upsert_domain(domain:, ttl:)
32-
request = protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest)
33-
34-
response = with_request_error_handling do
35-
client.post(Routes::UPSERT_DOMAIN, request, PROTOBUF_HEADER)
36-
end
31+
req = post_request(body: protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest), path: Routes::UPSERT_DOMAIN)
32+
response = request_with_error_handling(req)
3733

38-
validate_status!(response: response, statuses: [200])
34+
validate_status!(response)
3935
protobuf_decode!(response.body, Bbs::Models::UpsertDomainResponse)
4036
end
4137

4238
def desire_task(task_definition:, domain:, task_guid:)
43-
request = protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest)
39+
req = post_request(body: protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest),
40+
path: Routes::DESIRE_TASK)
41+
response = request_with_error_handling(req)
4442

45-
response = with_request_error_handling do
46-
client.post(Routes::DESIRE_TASK, request, PROTOBUF_HEADER)
47-
end
48-
49-
validate_status!(response: response, statuses: [200])
43+
validate_status!(response)
5044
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
5145
end
5246

5347
def task_by_guid(task_guid)
54-
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest)
55-
56-
response = with_request_error_handling do
57-
client.post(Routes::TASK_BY_GUID, request, PROTOBUF_HEADER)
58-
end
48+
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest), path: Routes::TASK_BY_GUID)
49+
response = request_with_error_handling(req)
5950

60-
validate_status!(response: response, statuses: [200])
51+
validate_status!(response)
6152
protobuf_decode!(response.body, Bbs::Models::TaskResponse)
6253
end
6354

6455
def tasks(domain: '', cell_id: '')
65-
request = protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest)
56+
req = post_request(body: protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest), path: Routes::LIST_TASKS)
57+
response = request_with_error_handling(req)
6658

67-
response = with_request_error_handling do
68-
client.post(Routes::LIST_TASKS, request, PROTOBUF_HEADER)
69-
end
70-
71-
validate_status!(response: response, statuses: [200])
59+
validate_status!(response)
7260
protobuf_decode!(response.body, Bbs::Models::TasksResponse)
7361
end
7462

7563
def cancel_task(task_guid)
76-
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest)
77-
78-
response = with_request_error_handling do
79-
client.post(Routes::CANCEL_TASK, request, PROTOBUF_HEADER)
80-
end
64+
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest), path: Routes::CANCEL_TASK)
65+
response = request_with_error_handling(req)
8166

82-
validate_status!(response: response, statuses: [200])
67+
validate_status!(response)
8368
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
8469
end
8570

8671
def desire_lrp(lrp)
87-
request = protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest)
72+
req = post_request(body: protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest), path: Routes::DESIRE_LRP)
73+
response = request_with_error_handling(req)
8874

89-
response = with_request_error_handling do
90-
client.post(Routes::DESIRE_LRP, request, PROTOBUF_HEADER)
91-
end
92-
93-
validate_status!(response: response, statuses: [200])
75+
validate_status!(response)
9476
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
9577
end
9678

9779
def desired_lrp_by_process_guid(process_guid)
98-
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest)
99-
100-
response = with_request_error_handling do
101-
client.post(Routes::DESIRED_LRP_BY_PROCESS_GUID, request, PROTOBUF_HEADER)
102-
end
80+
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest), path: Routes::DESIRED_LRP_BY_PROCESS_GUID)
81+
response = request_with_error_handling(req)
10382

104-
validate_status!(response: response, statuses: [200])
83+
validate_status!(response)
10584
protobuf_decode!(response.body, Bbs::Models::DesiredLRPResponse)
10685
end
10786

10887
def update_desired_lrp(process_guid, lrp_update)
109-
request = protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest)
88+
req = post_request(body: protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest), path: Routes::UPDATE_DESIRED_LRP)
89+
response = request_with_error_handling(req)
11090

111-
response = with_request_error_handling do
112-
client.post(Routes::UPDATE_DESIRED_LRP, request, PROTOBUF_HEADER)
113-
end
114-
115-
validate_status!(response: response, statuses: [200])
91+
validate_status!(response)
11692
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
11793
end
11894

11995
def remove_desired_lrp(process_guid)
120-
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest)
121-
122-
response = with_request_error_handling do
123-
client.post(Routes::REMOVE_DESIRED_LRP, request, PROTOBUF_HEADER)
124-
end
96+
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest), path: Routes::REMOVE_DESIRED_LRP)
97+
response = request_with_error_handling(req)
12598

126-
validate_status!(response: response, statuses: [200])
99+
validate_status!(response)
127100
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
128101
end
129102

130103
def retire_actual_lrp(actual_lrp_key)
131-
request = protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest)
104+
req = post_request(body: protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest), path: Routes::RETIRE_ACTUAL_LRP)
105+
response = request_with_error_handling(req)
132106

133-
response = with_request_error_handling do
134-
client.post(Routes::RETIRE_ACTUAL_LRP, request, PROTOBUF_HEADER)
135-
end
136-
137-
validate_status!(response: response, statuses: [200])
107+
validate_status!(response)
138108
protobuf_decode!(response.body, Bbs::Models::ActualLRPLifecycleResponse)
139109
end
140110

141111
def desired_lrp_scheduling_infos(domain)
142-
request = protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest)
143-
144-
response = with_request_error_handling do
145-
client.post(Routes::DESIRED_LRP_SCHEDULING_INFOS, request, PROTOBUF_HEADER)
146-
end
112+
req = post_request(body: protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest), path: Routes::DESIRED_LRP_SCHEDULING_INFOS)
113+
response = request_with_error_handling(req)
147114

148-
validate_status!(response: response, statuses: [200])
115+
validate_status!(response)
149116
protobuf_decode!(response.body, Bbs::Models::DesiredLRPSchedulingInfosResponse)
150117
end
151118

152119
def actual_lrps_by_process_guid(process_guid)
153-
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest)
120+
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest), path: Routes::ACTUAL_LRPS)
121+
response = request_with_error_handling(req)
154122

155-
response = with_request_error_handling do
156-
client.post(Routes::ACTUAL_LRPS, request, PROTOBUF_HEADER)
157-
end
158-
159-
validate_status!(response: response, statuses: [200])
123+
validate_status!(response)
160124
protobuf_decode!(response.body, Bbs::Models::ActualLRPsResponse)
161125
end
162126

163-
def with_request_error_handling(&blk)
164-
tries ||= 3
165-
yield
127+
def request_with_error_handling(req)
128+
attempt ||= 1
129+
http_client.request(bbs_url + req.path, req)
166130
rescue => e
167-
retry unless (tries -= 1).zero?
131+
retry unless (attempt += 1) > 3
168132
raise RequestError.new(e.message)
169133
end
170134

171135
private
172136

173-
attr_reader :client
137+
attr_reader :http_client, :bbs_url
174138

175139
def protobuf_encode!(hash, protobuf_message_class)
176140
# See below link to understand proto3 message encoding
@@ -180,8 +144,15 @@ def protobuf_encode!(hash, protobuf_message_class)
180144
raise EncodeError.new(e.message)
181145
end
182146

183-
def validate_status!(response:, statuses:)
184-
raise ResponseError.new("failed with status: #{response.status}, body: #{response.body}") unless statuses.include?(response.status)
147+
def post_request(body: nil, path:)
148+
req = Net::HTTP::Post.new(path)
149+
req.body = body if body
150+
req['Content-Type'.freeze] = 'application/x-protobuf'.freeze
151+
req
152+
end
153+
154+
def validate_status!(response)
155+
raise ResponseError.new("failed with status: #{response.code}, body: #{response.body}") unless response.code == '200'
185156
end
186157

187158
def protobuf_decode!(message, protobuf_decoder)
@@ -190,14 +161,16 @@ def protobuf_decode!(message, protobuf_decoder)
190161
raise DecodeError.new(e.message)
191162
end
192163

193-
def build_client(url, ca_cert_file, client_cert_file, client_key_file,
194-
connect_timeout, send_timeout, receive_timeout)
195-
client = HTTPClient.new(base_url: url)
196-
client.connect_timeout = connect_timeout
197-
client.send_timeout = send_timeout
198-
client.receive_timeout = receive_timeout
199-
client.ssl_config.set_client_cert_file(client_cert_file, client_key_file)
200-
client.ssl_config.set_trust_ca(ca_cert_file)
164+
def new_http_client(ca_cert_file, client_cert_file, client_key_file,
165+
connect_timeout, send_timeout, receive_timeout)
166+
client = Net::HTTP::Persistent.new(pool_size: WorkPool::SIZE)
167+
client.verify_mode = OpenSSL::SSL::VERIFY_PEER
168+
client.private_key = OpenSSL::PKey::RSA.new(File.read(client_key_file))
169+
client.certificate = OpenSSL::X509::Certificate.new(File.read(client_cert_file))
170+
client.ca_file = ca_cert_file
171+
client.open_timeout = connect_timeout
172+
client.read_timeout = receive_timeout
173+
client.write_timeout = send_timeout
201174
client
202175
end
203176
end

lib/utils/workpool.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
class WorkPool
2+
SIZE = 50
3+
24
attr_reader :exceptions, :threads
35

4-
def initialize(size, store_exceptions: false)
6+
def initialize(size: SIZE, store_exceptions: false)
57
@size = size
68
@store_exceptions = store_exceptions
79

0 commit comments

Comments
 (0)