44import com .datadoghq .trace .DDTraceInfo ;
55import com .datadoghq .trace .Service ;
66import com .fasterxml .jackson .databind .ObjectMapper ;
7+ import com .google .common .util .concurrent .RateLimiter ;
78import java .io .IOException ;
89import java .io .OutputStream ;
910import java .net .HttpURLConnection ;
1011import java .net .URL ;
1112import java .util .List ;
1213import java .util .Map ;
14+ import java .util .concurrent .TimeUnit ;
1315import lombok .extern .slf4j .Slf4j ;
1416import org .msgpack .jackson .dataformat .MessagePackFactory ;
1517
@@ -19,10 +21,14 @@ public class DDApi {
1921
2022 private static final String TRACES_ENDPOINT = "/v0.3/traces" ;
2123 private static final String SERVICES_ENDPOINT = "/v0.3/services" ;
24+ private static final long SECONDS_BETWEEN_ERROR_LOG = TimeUnit .MINUTES .toSeconds (5 );
2225
2326 private final String tracesEndpoint ;
2427 private final String servicesEndpoint ;
2528
29+ private final RateLimiter loggingRateLimiter =
30+ RateLimiter .create (1.0 / SECONDS_BETWEEN_ERROR_LOG );
31+
2632 private final ObjectMapper objectMapper = new ObjectMapper (new MessagePackFactory ());
2733
2834 public DDApi (final String host , final int port ) {
@@ -37,14 +43,7 @@ public DDApi(final String host, final int port) {
3743 * @return the staus code returned
3844 */
3945 public boolean sendTraces (final List <List <DDBaseSpan <?>>> traces ) {
40- final int status = callPUT (tracesEndpoint , traces );
41- if (status == 200 ) {
42- log .debug ("Succesfully sent {} traces to the DD agent." , traces .size ());
43- return true ;
44- } else {
45- log .warn ("Error while sending {} traces to the DD agent. Status: {}" , traces .size (), status );
46- return false ;
47- }
46+ return putContent ("traces" , tracesEndpoint , traces , traces .size ());
4847 }
4948
5049 /**
@@ -56,15 +55,7 @@ public boolean sendServices(final Map<String, Service> services) {
5655 if (services == null ) {
5756 return true ;
5857 }
59- final int status = callPUT (servicesEndpoint , services );
60- if (status == 200 ) {
61- log .debug ("Succesfully sent {} services to the DD agent." , services .size ());
62- return true ;
63- } else {
64- log .warn (
65- "Error while sending {} services to the DD agent. Status: {}" , services .size (), status );
66- return false ;
67- }
58+ return putContent ("services" , servicesEndpoint , services , services .size ());
6859 }
6960
7061 /**
@@ -73,33 +64,52 @@ public boolean sendServices(final Map<String, Service> services) {
7364 * @param content
7465 * @return the status code
7566 */
76- private int callPUT ( final String endpoint , final Object content ) {
77- HttpURLConnection httpCon = null ;
67+ private boolean putContent (
68+ final String type , final String endpoint , final Object content , final int size ) {
7869 try {
79- httpCon = getHttpURLConnection (endpoint );
80- } catch (final Exception e ) {
81- log .warn ("Error thrown before PUT call to the DD agent." , e );
82- return -1 ;
83- }
70+ final HttpURLConnection httpCon = getHttpURLConnection (endpoint );
8471
85- try {
8672 final OutputStream out = httpCon .getOutputStream ();
8773 objectMapper .writeValue (out , content );
8874 out .flush ();
8975 out .close ();
76+
9077 final int responseCode = httpCon .getResponseCode ();
91- if (responseCode == 200 ) {
92- log .debug ("Sent the payload to the DD agent." );
93- } else {
78+ if (responseCode != 200 ) {
79+ if (log .isDebugEnabled ()) {
80+ log .debug (
81+ "Error while sending {} {} to the DD agent. Status: {}, ResponseMessage: " ,
82+ size ,
83+ type ,
84+ responseCode ,
85+ httpCon .getResponseMessage ());
86+ } else if (loggingRateLimiter .tryAcquire ()) {
87+ log .warn (
88+ "Error while sending {} {} to the DD agent. Status: {} (going silent for {} seconds)" ,
89+ size ,
90+ type ,
91+ responseCode ,
92+ httpCon .getResponseMessage (),
93+ SECONDS_BETWEEN_ERROR_LOG );
94+ }
95+ return false ;
96+ }
97+
98+ log .debug ("Succesfully sent {} {} to the DD agent." , size , type );
99+ return true ;
100+
101+ } catch (final IOException e ) {
102+ if (log .isDebugEnabled ()) {
103+ log .debug ("Error while sending " + size + " " + type + " to the DD agent." , e );
104+ } else if (loggingRateLimiter .tryAcquire ()) {
94105 log .warn (
95- "Could not send the payload to the DD agent. Status: {} ResponseMessage: {}" ,
96- httpCon .getResponseCode (),
97- httpCon .getResponseMessage ());
106+ "Error while sending {} {} to the DD agent. Message: {} (going silent for {} seconds)" ,
107+ size ,
108+ type ,
109+ e .getMessage (),
110+ SECONDS_BETWEEN_ERROR_LOG );
98111 }
99- return responseCode ;
100- } catch (final Exception e ) {
101- log .warn ("Could not send the payload to the DD agent." , e );
102- return -1 ;
112+ return false ;
103113 }
104114 }
105115
0 commit comments