Skip to content

💩💩💩 Revert "Use Net::HTTP::Persistent in Diego client" #3171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ gem 'membrane', '~> 1.0'
gem 'mime-types', '~> 3.4'
gem 'multi_json'
gem 'multipart-parser'
gem 'net-http-persistent'
gem 'net-ssh'
gem 'netaddr', '>= 2.0.4'
gem 'newrelic_rpm'
Expand Down
4 changes: 0 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ GEM
simplecov (<= 0.13)
coderay (1.1.3)
concurrent-ruby (1.2.0)
connection_pool (2.3.0)
cookiejar (0.3.3)
crack (0.4.5)
rexml
Expand Down Expand Up @@ -327,8 +326,6 @@ GEM
mustermann (3.0.0)
ruby2_keywords (~> 0.0.1)
mysql2 (0.5.5)
net-http-persistent (4.0.1)
connection_pool (~> 2.2)
net-ssh (7.0.1)
netaddr (2.0.6)
netrc (0.11.0)
Expand Down Expand Up @@ -587,7 +584,6 @@ DEPENDENCIES
multi_json
multipart-parser
mysql2 (~> 0.5.5)
net-http-persistent
net-ssh
netaddr (>= 2.0.4)
newrelic_rpm
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/deployment_updater/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def dispatch
deployments_to_cancel = DeploymentModel.where(state: DeploymentModel::CANCELING_STATE).all

begin
workpool = WorkPool.new
workpool = WorkPool.new(50)

logger.info("scaling #{deployments_to_scale.size} deployments")
deployments_to_scale.each do |deployment|
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/diego/processes_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class BBSFetchError < Error

def initialize(config:, statsd_updater: VCAP::CloudController::Metrics::StatsdUpdater.new)
@config = config
@workpool = WorkPool.new(store_exceptions: true)
@workpool = WorkPool.new(50, store_exceptions: true)
@statsd_updater = statsd_updater
end

Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/diego/reporters/instances_reporter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def initialize(bbs_instances_client)
end

def self.singleton_workpool
@singleton_workpool ||= WorkPool.new
@singleton_workpool ||= WorkPool.new(50)
end

def all_instances_for_app(process)
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_controller/diego/tasks_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class BBSFetchError < Error

def initialize(config:)
@config = config
@workpool = WorkPool.new(store_exceptions: true)
@workpool = WorkPool.new(50, store_exceptions: true)
end

def sync
Expand Down
163 changes: 95 additions & 68 deletions lib/diego/client.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
require 'diego/bbs/bbs'
require 'diego/errors'
require 'diego/routes'
require 'net/http/persistent'
require 'uri'

module Diego
class Client
PROTOBUF_HEADER = { 'Content-Type'.freeze => 'application/x-protobuf'.freeze }.freeze

def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
connect_timeout:, send_timeout:, receive_timeout:)
ENV['PB_IGNORE_DEPRECATIONS'] ||= 'true'
@bbs_url = URI(url)
@http_client = new_http_client(
@client = build_client(
url,
ca_cert_file,
client_cert_file,
client_key_file,
Expand All @@ -20,121 +20,157 @@ def initialize(url:, ca_cert_file:, client_cert_file:, client_key_file:,
end

def ping
req = post_request(path: Routes::PING)
response = request_with_error_handling(req)
response = with_request_error_handling do
client.post(Routes::PING)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::PingResponse)
end

def upsert_domain(domain:, ttl:)
req = post_request(body: protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest), path: Routes::UPSERT_DOMAIN)
response = request_with_error_handling(req)
request = protobuf_encode!({ domain: domain, ttl: ttl.to_i }, Bbs::Models::UpsertDomainRequest)

response = with_request_error_handling do
client.post(Routes::UPSERT_DOMAIN, request, PROTOBUF_HEADER)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::UpsertDomainResponse)
end

def desire_task(task_definition:, domain:, task_guid:)
req = post_request(body: protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest),
path: Routes::DESIRE_TASK)
response = request_with_error_handling(req)
request = protobuf_encode!({ task_definition: task_definition, domain: domain, task_guid: task_guid }, Bbs::Models::DesireTaskRequest)

validate_status!(response)
response = with_request_error_handling do
client.post(Routes::DESIRE_TASK, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
end

def task_by_guid(task_guid)
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest), path: Routes::TASK_BY_GUID)
response = request_with_error_handling(req)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskByGuidRequest)

response = with_request_error_handling do
client.post(Routes::TASK_BY_GUID, request, PROTOBUF_HEADER)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::TaskResponse)
end

def tasks(domain: '', cell_id: '')
req = post_request(body: protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest), path: Routes::LIST_TASKS)
response = request_with_error_handling(req)
request = protobuf_encode!({ domain: domain, cell_id: cell_id }, Bbs::Models::TasksRequest)

validate_status!(response)
response = with_request_error_handling do
client.post(Routes::LIST_TASKS, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::TasksResponse)
end

def cancel_task(task_guid)
req = post_request(body: protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest), path: Routes::CANCEL_TASK)
response = request_with_error_handling(req)
request = protobuf_encode!({ task_guid: task_guid }, Bbs::Models::TaskGuidRequest)

response = with_request_error_handling do
client.post(Routes::CANCEL_TASK, request, PROTOBUF_HEADER)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::TaskLifecycleResponse)
end

def desire_lrp(lrp)
req = post_request(body: protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest), path: Routes::DESIRE_LRP)
response = request_with_error_handling(req)
request = protobuf_encode!({ desired_lrp: lrp }, Bbs::Models::DesireLRPRequest)

validate_status!(response)
response = with_request_error_handling do
client.post(Routes::DESIRE_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def desired_lrp_by_process_guid(process_guid)
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest), path: Routes::DESIRED_LRP_BY_PROCESS_GUID)
response = request_with_error_handling(req)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::DesiredLRPByProcessGuidRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_BY_PROCESS_GUID, request, PROTOBUF_HEADER)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::DesiredLRPResponse)
end

def update_desired_lrp(process_guid, lrp_update)
req = post_request(body: protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest), path: Routes::UPDATE_DESIRED_LRP)
response = request_with_error_handling(req)
request = protobuf_encode!({ process_guid: process_guid, update: lrp_update }, Bbs::Models::UpdateDesiredLRPRequest)

validate_status!(response)
response = with_request_error_handling do
client.post(Routes::UPDATE_DESIRED_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def remove_desired_lrp(process_guid)
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest), path: Routes::REMOVE_DESIRED_LRP)
response = request_with_error_handling(req)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::RemoveDesiredLRPRequest)

response = with_request_error_handling do
client.post(Routes::REMOVE_DESIRED_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::DesiredLRPLifecycleResponse)
end

def retire_actual_lrp(actual_lrp_key)
req = post_request(body: protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest), path: Routes::RETIRE_ACTUAL_LRP)
response = request_with_error_handling(req)
request = protobuf_encode!({ actual_lrp_key: actual_lrp_key }, Bbs::Models::RetireActualLRPRequest)

validate_status!(response)
response = with_request_error_handling do
client.post(Routes::RETIRE_ACTUAL_LRP, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::ActualLRPLifecycleResponse)
end

def desired_lrp_scheduling_infos(domain)
req = post_request(body: protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest), path: Routes::DESIRED_LRP_SCHEDULING_INFOS)
response = request_with_error_handling(req)
request = protobuf_encode!({ domain: domain }, Bbs::Models::DesiredLRPsRequest)

response = with_request_error_handling do
client.post(Routes::DESIRED_LRP_SCHEDULING_INFOS, request, PROTOBUF_HEADER)
end

validate_status!(response)
validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::DesiredLRPSchedulingInfosResponse)
end

def actual_lrps_by_process_guid(process_guid)
req = post_request(body: protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest), path: Routes::ACTUAL_LRPS)
response = request_with_error_handling(req)
request = protobuf_encode!({ process_guid: process_guid }, Bbs::Models::ActualLRPsRequest)

validate_status!(response)
response = with_request_error_handling do
client.post(Routes::ACTUAL_LRPS, request, PROTOBUF_HEADER)
end

validate_status!(response: response, statuses: [200])
protobuf_decode!(response.body, Bbs::Models::ActualLRPsResponse)
end

def request_with_error_handling(req)
attempt ||= 1
http_client.request(bbs_url + req.path, req)
def with_request_error_handling(&blk)
tries ||= 3
yield
rescue => e
retry unless (attempt += 1) > 3
retry unless (tries -= 1).zero?
raise RequestError.new(e.message)
end

private

attr_reader :http_client, :bbs_url
attr_reader :client

def protobuf_encode!(hash, protobuf_message_class)
# See below link to understand proto3 message encoding
Expand All @@ -144,15 +180,8 @@ def protobuf_encode!(hash, protobuf_message_class)
raise EncodeError.new(e.message)
end

def post_request(body: nil, path:)
req = Net::HTTP::Post.new(path)
req.body = body if body
req['Content-Type'.freeze] = 'application/x-protobuf'.freeze
req
end

def validate_status!(response)
raise ResponseError.new("failed with status: #{response.code}, body: #{response.body}") unless response.code == '200'
def validate_status!(response:, statuses:)
raise ResponseError.new("failed with status: #{response.status}, body: #{response.body}") unless statuses.include?(response.status)
end

def protobuf_decode!(message, protobuf_decoder)
Expand All @@ -161,16 +190,14 @@ def protobuf_decode!(message, protobuf_decoder)
raise DecodeError.new(e.message)
end

def new_http_client(ca_cert_file, client_cert_file, client_key_file,
connect_timeout, send_timeout, receive_timeout)
client = Net::HTTP::Persistent.new(pool_size: WorkPool::SIZE)
client.verify_mode = OpenSSL::SSL::VERIFY_PEER
client.private_key = OpenSSL::PKey::RSA.new(File.read(client_key_file))
client.certificate = OpenSSL::X509::Certificate.new(File.read(client_cert_file))
client.ca_file = ca_cert_file
client.open_timeout = connect_timeout
client.read_timeout = receive_timeout
client.write_timeout = send_timeout
def build_client(url, ca_cert_file, client_cert_file, client_key_file,
connect_timeout, send_timeout, receive_timeout)
client = HTTPClient.new(base_url: url)
client.connect_timeout = connect_timeout
client.send_timeout = send_timeout
client.receive_timeout = receive_timeout
client.ssl_config.set_client_cert_file(client_cert_file, client_key_file)
client.ssl_config.set_trust_ca(ca_cert_file)
client
end
end
Expand Down
4 changes: 1 addition & 3 deletions lib/utils/workpool.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
class WorkPool
SIZE = 50

attr_reader :exceptions, :threads

def initialize(size: SIZE, store_exceptions: false)
def initialize(size, store_exceptions: false)
@size = size
@store_exceptions = store_exceptions

Expand Down
Loading