Skip to content

Commit

Permalink
[receiver/haproxy] support empty values (open-telemetry#30269)
Browse files Browse the repository at this point in the history
**Description:** 
support haproxy stats displaying empty values.

**Link to tracking Issue:**
Fixes open-telemetry#30252 

**Testing:**
Add an integration test with a record with empty values.
  • Loading branch information
atoulme authored Jan 4, 2024
1 parent c61445d commit 00263ad
Show file tree
Hide file tree
Showing 5 changed files with 982 additions and 71 deletions.
27 changes: 27 additions & 0 deletions .chloggen/support_empty_values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: haproxyreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support empty values in haproxy stats.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30252]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
134 changes: 63 additions & 71 deletions receiver/haproxyreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,93 +85,83 @@ func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {

now := pcommon.NewTimestampFromTime(time.Now())
for _, record := range records {
err := s.mb.RecordHaproxySessionsCountDataPoint(now, record["scur"])
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["scur"] != "" {
if err := s.mb.RecordHaproxySessionsCountDataPoint(now, record["scur"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["conn_rate"] != "" {
err = s.mb.RecordHaproxyConnectionsRateDataPoint(now, record["conn_rate"])
if err != nil {
if err := s.mb.RecordHaproxyConnectionsRateDataPoint(now, record["conn_rate"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["conn_tot"] != "" {
err = s.mb.RecordHaproxyConnectionsTotalDataPoint(now, record["conn_tot"])
if err != nil {
if err := s.mb.RecordHaproxyConnectionsTotalDataPoint(now, record["conn_tot"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["lbtot"] != "" {
err = s.mb.RecordHaproxyServerSelectedTotalDataPoint(now, record["lbtot"])
if err != nil {
if err := s.mb.RecordHaproxyServerSelectedTotalDataPoint(now, record["lbtot"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyBytesInputDataPoint(now, record["bin"])
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["bin"] != "" {
if err := s.mb.RecordHaproxyBytesInputDataPoint(now, record["bin"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyBytesOutputDataPoint(now, record["bout"])
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["bout"] != "" {
if err := s.mb.RecordHaproxyBytesOutputDataPoint(now, record["bout"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["cli_abrt"] != "" {
err = s.mb.RecordHaproxyClientsCanceledDataPoint(now, record["cli_abrt"])
if err != nil {
if err := s.mb.RecordHaproxyClientsCanceledDataPoint(now, record["cli_abrt"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["comp_byp"] != "" {
err = s.mb.RecordHaproxyCompressionBypassDataPoint(now, record["comp_byp"])
if err != nil {
if err := s.mb.RecordHaproxyCompressionBypassDataPoint(now, record["comp_byp"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["comp_in"] != "" {
err = s.mb.RecordHaproxyCompressionInputDataPoint(now, record["comp_in"])
if err != nil {
if err := s.mb.RecordHaproxyCompressionInputDataPoint(now, record["comp_in"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["comp_out"] != "" {
err = s.mb.RecordHaproxyCompressionOutputDataPoint(now, record["comp_out"])
if err != nil {
if err := s.mb.RecordHaproxyCompressionOutputDataPoint(now, record["comp_out"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["comp_rsp"] != "" {
err = s.mb.RecordHaproxyCompressionCountDataPoint(now, record["comp_rsp"])
if err != nil {
if err := s.mb.RecordHaproxyCompressionCountDataPoint(now, record["comp_rsp"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["dreq"] != "" {
err = s.mb.RecordHaproxyRequestsDeniedDataPoint(now, record["dreq"])
if err != nil {
if err := s.mb.RecordHaproxyRequestsDeniedDataPoint(now, record["dreq"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["dresp"] != "" {
err = s.mb.RecordHaproxyResponsesDeniedDataPoint(now, record["dresp"])
if err != nil {
if err := s.mb.RecordHaproxyResponsesDeniedDataPoint(now, record["dresp"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["downtime"] != "" {
err = s.mb.RecordHaproxyDowntimeDataPoint(now, record["downtime"])
if err != nil {
if err := s.mb.RecordHaproxyDowntimeDataPoint(now, record["downtime"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["econ"] != "" {
err = s.mb.RecordHaproxyConnectionsErrorsDataPoint(now, record["econ"])
if err != nil {
if err := s.mb.RecordHaproxyConnectionsErrorsDataPoint(now, record["econ"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["ereq"] != "" {
err = s.mb.RecordHaproxyRequestsErrorsDataPoint(now, record["ereq"])
if err != nil {
if err := s.mb.RecordHaproxyRequestsErrorsDataPoint(now, record["ereq"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
Expand All @@ -189,72 +179,74 @@ func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
s.mb.RecordHaproxyResponsesErrorsDataPoint(now, abortsVal+erespVal)
}
if record["chkfail"] != "" {
err = s.mb.RecordHaproxyFailedChecksDataPoint(now, record["chkfail"])
if err != nil {
if err := s.mb.RecordHaproxyFailedChecksDataPoint(now, record["chkfail"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["wredis"] != "" {
err = s.mb.RecordHaproxyRequestsRedispatchedDataPoint(now, record["wredis"])
if err != nil {
if err := s.mb.RecordHaproxyRequestsRedispatchedDataPoint(now, record["wredis"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_1xx"], metadata.AttributeStatusCode1xx)
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["hrsp_1xx"] != "" {
if err := s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_1xx"], metadata.AttributeStatusCode1xx); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_2xx"], metadata.AttributeStatusCode2xx)
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["hrsp_2xx"] != "" {
if err := s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_2xx"], metadata.AttributeStatusCode2xx); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_3xx"], metadata.AttributeStatusCode3xx)
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["hrsp_3xx"] != "" {
if err := s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_3xx"], metadata.AttributeStatusCode3xx); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_4xx"], metadata.AttributeStatusCode4xx)
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["hrsp_4xx"] != "" {
if err := s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_4xx"], metadata.AttributeStatusCode4xx); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_5xx"], metadata.AttributeStatusCode5xx)
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["hrsp_5xx"] != "" {
if err := s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_5xx"], metadata.AttributeStatusCode5xx); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_other"], metadata.AttributeStatusCodeOther)
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["hrsp_other"] != "" {
if err := s.mb.RecordHaproxyRequestsTotalDataPoint(now, record["hrsp_other"], metadata.AttributeStatusCodeOther); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["wretr"] != "" {
err = s.mb.RecordHaproxyConnectionsRetriesDataPoint(now, record["wretr"])
if err != nil {
if err := s.mb.RecordHaproxyConnectionsRetriesDataPoint(now, record["wretr"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxySessionsTotalDataPoint(now, record["stot"])
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["stot"] != "" {
if err := s.mb.RecordHaproxySessionsTotalDataPoint(now, record["stot"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["qcur"] != "" {
err = s.mb.RecordHaproxyRequestsQueuedDataPoint(now, record["qcur"])
if err != nil {
if err := s.mb.RecordHaproxyRequestsQueuedDataPoint(now, record["qcur"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["req_rate"] != "" {
err = s.mb.RecordHaproxyRequestsRateDataPoint(now, record["req_rate"])
if err != nil {
if err := s.mb.RecordHaproxyRequestsRateDataPoint(now, record["req_rate"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
if record["ttime"] != "" {
err = s.mb.RecordHaproxySessionsAverageDataPoint(now, record["ttime"])
if err != nil {
if err := s.mb.RecordHaproxySessionsAverageDataPoint(now, record["ttime"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
err = s.mb.RecordHaproxySessionsRateDataPoint(now, record["rate"])
if err != nil {
scrapeErrors = append(scrapeErrors, err)
if record["rate"] != "" {
if err := s.mb.RecordHaproxySessionsRateDataPoint(now, record["rate"]); err != nil {
scrapeErrors = append(scrapeErrors, err)
}
}
rb := s.mb.NewResourceBuilder()
rb.SetHaproxyProxyName(record["pxname"])
Expand Down
43 changes: 43 additions & 0 deletions receiver/haproxyreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,46 @@ func Test_scraper_readStats(t *testing.T) {
pmetrictest.IgnoreTimestamp(), pmetrictest.IgnoreResourceAttributeValue("haproxy.addr"),
pmetrictest.IgnoreResourceMetricsOrder()))
}

func Test_scraper_readStatsWithIncompleteValues(t *testing.T) {
f, err := os.MkdirTemp("", "haproxytest")
require.NoError(t, err)
socketAddr := filepath.Join(f, "testhaproxy.sock")
l, err := net.Listen("unix", socketAddr)
require.NoError(t, err)
defer l.Close()

go func() {
c, err2 := l.Accept()
require.NoError(t, err2)

buf := make([]byte, 512)
nr, err2 := c.Read(buf)
require.NoError(t, err2)

data := string(buf[0:nr])
switch data {
case "show stats\n":
stats, err2 := os.ReadFile(filepath.Join("testdata", "30252_stats.txt"))
require.NoError(t, err2)
_, err2 = c.Write(stats)
require.NoError(t, err2)
default:
require.Fail(t, fmt.Sprintf("invalid message: %v", data))
}
}()

haProxyCfg := newDefaultConfig().(*Config)
haProxyCfg.Endpoint = socketAddr
s := newScraper(haProxyCfg, receivertest.NewNopCreateSettings())
m, err := s.scrape(context.Background())
require.NoError(t, err)
require.NotNil(t, m)

expectedFile := filepath.Join("testdata", "scraper", "30252_expected.yaml")
expectedMetrics, err := golden.ReadMetrics(expectedFile)
require.NoError(t, err)
require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, m, pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(), pmetrictest.IgnoreResourceAttributeValue("haproxy.addr"),
pmetrictest.IgnoreResourceMetricsOrder()))
}
7 changes: 7 additions & 0 deletions receiver/haproxyreceiver/testdata/30252_stats.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,check_status,check_code,check_duration,hrsp_1xx,hrsp_2xx,hrsp_3xx,hrsp_4xx,hrsp_5xx,hrsp_other,hanafail,req_rate,req_rate_max,req_tot,cli_abrt,srv_abrt,comp_in,comp_out,comp_byp,comp_rsp,lastsess,last_chk,last_agt,qtime,ctime,rtime,ttime,agent_status,agent_code,agent_duration,check_desc,agent_desc,check_rise,check_fall,check_health,agent_rise,agent_fall,agent_health,addr,cookie,mode,algo,conn_rate,conn_rate_max,conn_tot,intercepted,dcon,dses,wrew,connect,reuse,cache_lookups,cache_hits,srv_icur,src_ilim,qtime_max,ctime_max,rtime_max,ttime_max,eint,idle_conn_cur,safe_conn_cur,used_conn_cur,need_conn_est,uweight,agg_server_status,agg_server_check_status,agg_check_status,-,ssl_sess,ssl_reused_sess,ssl_failed_handshake,h2_headers_rcvd,h2_data_rcvd,h2_settings_rcvd,h2_rst_stream_rcvd,h2_goaway_rcvd,h2_detected_conn_protocol_errors,h2_detected_strm_protocol_errors,h2_rst_stream_resp,h2_goaway_resp,h2_open_connections,h2_backend_open_streams,h2_total_connections,h2_backend_total_streams,h1_open_connections,h1_open_streams,h1_total_connections,h1_total_streams,h1_bytes_in,h1_bytes_out,h1_spliced_bytes_in,h1_spliced_bytes_out,
stats,FRONTEND,,,0,1,524268,2,1444,47008,0,0,0,,,,,OPEN,,,,,,,,,1,2,0,,,,0,0,0,1,,,,0,2,0,0,0,0,,0,1,2,,,0,0,0,0,,,,,,,,,,,,,,,,,,,,,http,,0,1,2,2,0,0,0,,,0,0,,,,,,,0,,,,,,,,,-,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,2,1594,47052,0,0,
myfrontend,FRONTEND,,,1,1,524268,1,85470,107711,0,0,0,,,,,OPEN,,,,,,,,,1,3,0,,,,0,0,0,1,,,,0,134,0,0,0,0,,0,11,134,,,0,0,0,0,,,,,,,,,,,,,,,,,,,,,http,,0,1,1,0,0,0,0,,,0,0,,,,,,,0,,,,,,,,,-,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,134,94712,107309,0,0,
webservers,s1,0,0,0,1,,45,28734,36204,,0,,0,0,0,0,UP,1,1,0,0,0,159,0,,1,4,1,,45,,2,0,,4,L4OK,,0,0,45,0,0,0,0,,,,45,0,0,,,,,3,,,0,1,4,95,,,,Layer4 check passed,,2,3,4,,,,192.168.16.2:8080,,http,,,,,,,,0,1,44,,,1,,0,1,26,184,0,0,1,0,1,1,,,,-,0,0,0,,,,,,,,,,,,,,,,,,,,,,
webservers,s,,,,,,,,,,,,,,,,UP,,,,,,,,,,,,,,,,,,,LOK,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Layer check passed,,,,,,,,...,,http,,,,,,,,,,,,,,,,,,,,,,,,,,,,-,,,,,,,,,,,,,,,,,,,,,,,,,
webservers,s3,0,0,0,1,,44,28072,35376,,0,,0,0,0,0,UP,1,1,0,0,0,159,0,,1,4,3,,44,,2,0,,4,L4OK,,0,0,44,0,0,0,0,,,,44,0,0,,,,,4,,,0,1,4,121,,,,Layer4 check passed,,2,3,4,,,,192.168.16.4:8080,,http,,,,,,,,0,1,43,,,1,,0,3,25,1331,0,0,1,0,1,1,,,,-,0,0,0,,,,,,,,,,,,,,,,,,,,,,
webservers,BACKEND,0,0,0,1,52427,134,85470,107711,0,0,,0,0,0,0,UP,3,3,0,,0,159,0,,1,4,0,,134,,1,0,,11,,,,0,134,0,0,0,0,,,,134,0,0,0,0,0,0,3,,,0,1,4,105,,,,,,,,,,,,,,http,roundrobin,,,,,,,0,3,131,0,0,,,0,3,26,1331,0,,,,,3,0,0,0,-,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,3,134,107309,91496,0,0,
Loading

0 comments on commit 00263ad

Please sign in to comment.