Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/plugins/ruby/CustomMetricsUtils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def check_custom_metrics_availability
return true
end

if enable_custom_metrics.nil? || enable_custom_metrics.to_s.downcase == 'false'
if enable_custom_metrics.nil? || enable_custom_metrics.to_s.empty? || enable_custom_metrics.to_s.downcase == 'false'
return false
end

Expand Down
195 changes: 185 additions & 10 deletions source/plugins/ruby/KubernetesApiClient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class KubernetesApiClient
require "time"
require "ipaddress"
require "jwt"
require "zlib"
require "stringio"
require 'yajl'

require_relative "oms_common"
require_relative "constants"
Expand Down Expand Up @@ -864,18 +867,190 @@ def getResourcesAndContinuationTokenV2(uri, api_group: nil)
resourceInventory = nil
responseCode = nil
begin
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2 : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
responseCode, resourceInfo = getKubeResourceInfoV2(uri, api_group: api_group)
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2 : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
if !responseCode.nil? && responseCode == "200" && !resourceInfo.nil?
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2:Start:Parsing data for #{uri} using JSON @ #{Time.now.utc.iso8601}"
resourceInventory = JSON.parse(resourceInfo.body)
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2:End:Parsing data for #{uri} using JSON @ #{Time.now.utc.iso8601}"
resourceInfo = nil
resource_path = getResourceUri(uri, api_group)
if resource_path.nil?
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: resource path nil for #{uri}"
return continuationToken, resourceInventory, responseCode
end
if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?)
continuationToken = resourceInventory["metadata"]["continue"]
parsed_items = []
metadata_continue = nil
resource_version = nil
parse_mode = "stream"
total_uncompressed_bytes = 0
total_compressed_bytes = 0
started_at = Time.now.utc

begin
parsed_uri = URI.parse(resource_path)
if !File.exist?(@@CaFile)
raise "#{@@CaFile} doesnt exist"
end

Net::HTTP.start(parsed_uri.host, parsed_uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http|
kubeApiRequest = Net::HTTP::Get.new(parsed_uri.request_uri)
kubeApiRequest['Authorization'] = 'Bearer ' + getTokenStr
kubeApiRequest['User-Agent'] = getUserAgent()
kubeApiRequest['Accept-Encoding'] = 'gzip'
kubeApiRequest['Accept'] = 'application/json'

@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2(stream): Requesting #{uri} (api_group=#{api_group}) @ #{started_at.iso8601}"

http.request(kubeApiRequest) do |response|
responseCode = response.code
unless responseCode == '200'
parse_mode = 'error'
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Non-success code #{responseCode} for #{uri}"
# Send telemetry for non-success response codes
@@K8sApiResponseTelemetryTimeTracker = ApplicationInsightsUtility.sendAPIResponseTelemetry(responseCode, uri, "K8sAPIStatus", @@K8sApiResponseCodeHash, @@K8sApiResponseTelemetryTimeTracker)
break
end

# Decide whether to stream or fallback to full parse based on Content-Length (if small, cheaper to full-parse)
content_length = nil
begin
content_length = Integer(response['Content-Length']) if response['Content-Length']
rescue; end
small_threshold = 256 * 1024 # 256KB

if content_length && content_length <= small_threshold
# Read whole (possibly compressed) body then use faster parser for small payloads
body_buf = +"" # mutable string
response.read_body { |c| body_buf << c }
total_compressed_bytes = body_buf.bytesize
if response['Content-Encoding'] == 'gzip'
begin
body_buf = Zlib::GzipReader.new(StringIO.new(body_buf)).read
parse_mode = 'full_gzip'
rescue => gzerr
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: gzip decompress(small) failed: #{gzerr}; using compressed body (parse will likely fail)"
end
else
parse_mode = 'full_plain'
end
total_uncompressed_bytes = body_buf.bytesize
resourceInventory = JSON.parse(body_buf)
else
# Streaming path - CRITICAL: Create parser ONCE outside the read_body loop
parse_mode = 'stream'
is_gzip = (response['Content-Encoding'] == 'gzip')
inflater = nil
yajl_parser = nil
accumulated_buffer = +"" # Buffer for incomplete JSON chunks
begin
if is_gzip
# Use Inflate with gzip window bits for streaming
inflater = Zlib::Inflate.new(Zlib::MAX_WBITS + 32)
parse_mode = 'stream_gzip'
end

# Create Yajl parser ONCE and reuse for all chunks
yajl_parser = Yajl::Parser.new

# Set up the parser callback to extract items and continuation token
yajl_parser.on_parse_complete = lambda do |obj|
if obj.is_a?(Hash)
if obj.key?('items') && obj['items'].is_a?(Array)
# Force deep copy via JSON round-trip to avoid Yajl object reference issues
begin
serialized_items = JSON.generate(obj['items'])
deep_copied_items = JSON.parse(serialized_items)
parsed_items.concat(deep_copied_items)
rescue => json_err
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: JSON round-trip failed: #{json_err}, using shallow copy"
parsed_items.concat(obj['items'])
end
end
if obj.key?('metadata') && obj['metadata'].is_a?(Hash)
metadata_continue = obj['metadata']['continue']
resource_version = obj['metadata']['resourceVersion'] if obj['metadata'].key?('resourceVersion')
end
end
end

# Stream and parse chunks
chunk_count = 0
response.read_body do |compressed_chunk|
chunk_count += 1
total_compressed_bytes += compressed_chunk.bytesize

decompressed = if is_gzip
begin
inflater.inflate(compressed_chunk)
rescue Zlib::Error => zerr
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: gzip inflate failed at chunk #{chunk_count}: #{zerr}"
raise
end
else
compressed_chunk
end

total_uncompressed_bytes += decompressed.bytesize

# Feed decompressed chunk to the parser
# Yajl can handle incomplete JSON and will buffer internally
begin
yajl_parser << decompressed
rescue Yajl::ParseError => perr
# Only log parse errors, don't break the stream - might be incomplete chunk
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Yajl parse error at chunk #{chunk_count}: #{perr}"
end

# Yield control periodically to allow other threads to run (every 10 chunks)
Thread.pass if chunk_count % 10 == 0
end # read_body

# Finalize the parsing - this triggers on_parse_complete callback
yajl_parser.parse("") rescue nil

# Finish and clean up inflater
if inflater
inflater.finish rescue nil
inflater.close rescue nil
end

# Build minimal inventory structure
resourceInventory = {
'metadata' => { 'continue' => metadata_continue, 'resourceVersion' => resource_version },
'items' => parsed_items
}

@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2: Successfully parsed #{parsed_items.length} items in #{chunk_count} chunks"

rescue => stream_err
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: Stream processing error: #{stream_err}"
# Clean up resources
inflater.close if inflater rescue nil
raise
end
end # streaming path
end # http.request
end # Net::HTTP.start
rescue => inner_err
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: streaming fetch/parse failed for #{uri}: #{inner_err}; falling back to legacy getKubeResourceInfoV2"
parse_mode = 'fallback'
begin
# Fallback to legacy path
fallbackResponseCode, resourceInfo = getKubeResourceInfoV2(uri, api_group: api_group)
responseCode = fallbackResponseCode if responseCode.nil?
if fallbackResponseCode == '200' && resourceInfo && resourceInfo.body && !resourceInfo.body.empty?
resourceInventory = JSON.parse(resourceInfo.body)
# Set continuationToken from fallback response
if resourceInventory && resourceInventory['metadata']
continuationToken = resourceInventory['metadata']['continue']
end
end
rescue => legacy_err
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2: legacy fallback also failed: #{legacy_err}"
ApplicationInsightsUtility.sendExceptionTelemetry(legacy_err)
end
end

# Derive continuation token if not already set
if continuationToken.nil? && resourceInventory && resourceInventory['metadata']
continuationToken = resourceInventory['metadata']['continue']
end
duration_ms = ((Time.now.utc - started_at) * 1000).round(1)
@Log.info "KubernetesApiClient::getResourcesAndContinuationTokenV2: mode=#{parse_mode} code=#{responseCode} items=#{resourceInventory && resourceInventory['items'] ? resourceInventory['items'].length : 'n/a'} cont=#{continuationToken.nil? ? 'nil' : continuationToken.empty? ? 'empty' : 'set'} compBytes=#{total_compressed_bytes} uncompBytes=#{total_uncompressed_bytes} ms=#{duration_ms} uri=#{uri}"
rescue => errorStr
@Log.warn "KubernetesApiClient::getResourcesAndContinuationTokenV2:Failed in get resources for #{uri} and continuation token: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
Expand Down
4 changes: 3 additions & 1 deletion source/plugins/ruby/in_kube_podinventory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,9 @@ def getPodInventoryRecords(item, serviceRecords, batchTime = Time.utc.iso8601)
records.push(record)
end #container status block end

@mdmPodRecordItems.push(mdmPodRecord.dup)
if CustomMetricsUtils.check_custom_metrics_availability
@mdmPodRecordItems.push(mdmPodRecord.dup)
end

records.each do |record|
if !record.nil?
Expand Down
Loading