Skip to content

Commit c5b6e46

Browse files
authored
Ensure info discovery state change is atomic (#9497)
1 parent 52c71a7 commit c5b6e46

File tree

3 files changed

+152
-135
lines changed

3 files changed

+152
-135
lines changed

communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java

Lines changed: 82 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,24 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
8181
private final String[] evpProxyEndpoints = {V4_EVP_PROXY_ENDPOINT, V2_EVP_PROXY_ENDPOINT};
8282
private final String[] telemetryProxyEndpoints = {TELEMETRY_PROXY_ENDPOINT};
8383

84-
private volatile String traceEndpoint;
85-
private volatile String metricsEndpoint;
86-
private volatile String dataStreamsEndpoint;
87-
private volatile boolean supportsLongRunning;
88-
private volatile boolean supportsDropping;
89-
private volatile String state;
90-
private volatile String configEndpoint;
91-
private volatile String debuggerEndpoint;
92-
private volatile String debuggerDiagnosticsEndpoint;
93-
private volatile String evpProxyEndpoint;
94-
private volatile String version;
95-
private volatile String telemetryProxyEndpoint;
96-
private volatile Set<String> peerTags = emptySet();
97-
98-
private long lastTimeDiscovered;
84+
private static class State {
85+
String traceEndpoint;
86+
String metricsEndpoint;
87+
String dataStreamsEndpoint;
88+
boolean supportsLongRunning;
89+
boolean supportsDropping;
90+
String state;
91+
String configEndpoint;
92+
String debuggerEndpoint;
93+
String debuggerDiagnosticsEndpoint;
94+
String evpProxyEndpoint;
95+
String version;
96+
String telemetryProxyEndpoint;
97+
Set<String> peerTags = emptySet();
98+
long lastTimeDiscovered;
99+
}
100+
101+
private volatile State discoveryState;
99102

100103
public DDAgentFeaturesDiscovery(
101104
OkHttpClient client,
@@ -111,23 +114,7 @@ public DDAgentFeaturesDiscovery(
111114
? new String[] {V5_ENDPOINT, V4_ENDPOINT, V3_ENDPOINT}
112115
: new String[] {V4_ENDPOINT, V3_ENDPOINT};
113116
this.discoveryTimer = monitoring.newTimer("trace.agent.discovery.time");
114-
}
115-
116-
private void reset() {
117-
traceEndpoint = null;
118-
metricsEndpoint = null;
119-
supportsDropping = false;
120-
supportsLongRunning = false;
121-
state = null;
122-
configEndpoint = null;
123-
debuggerEndpoint = null;
124-
debuggerDiagnosticsEndpoint = null;
125-
dataStreamsEndpoint = null;
126-
evpProxyEndpoint = null;
127-
version = null;
128-
lastTimeDiscovered = 0;
129-
telemetryProxyEndpoint = null;
130-
peerTags = emptySet();
117+
this.discoveryState = new State();
131118
}
132119

133120
/** Run feature discovery, unconditionally. */
@@ -146,15 +133,17 @@ protected long getFeaturesDiscoveryMinDelayMillis() {
146133

147134
private synchronized void discoverIfOutdated(final long maxElapsedMs) {
148135
final long now = System.currentTimeMillis();
149-
final long elapsed = now - lastTimeDiscovered;
136+
final long elapsed = now - discoveryState.lastTimeDiscovered;
150137
if (elapsed > maxElapsedMs) {
151-
doDiscovery();
152-
lastTimeDiscovered = now;
138+
final State newState = new State();
139+
doDiscovery(newState);
140+
newState.lastTimeDiscovered = now;
141+
// swap atomically states
142+
discoveryState = newState;
153143
}
154144
}
155145

156-
private void doDiscovery() {
157-
reset();
146+
private void doDiscovery(State newState) {
158147
// 1. try to fetch info about the agent, if the endpoint is there
159148
// 2. try to parse the response, if it can be parsed, finish
160149
// 3. fallback if the endpoint couldn't be found or the response couldn't be parsed
@@ -169,44 +158,44 @@ private void doDiscovery() {
169158
try (Response response = client.newCall(requestBuilder.build()).execute()) {
170159
if (response.isSuccessful()) {
171160
processInfoResponseHeaders(response);
172-
fallback = !processInfoResponse(response.body().string());
161+
fallback = !processInfoResponse(newState, response.body().string());
173162
}
174163
} catch (Throwable error) {
175164
errorQueryingEndpoint("info", error);
176165
}
177166
if (fallback) {
178-
supportsDropping = false;
179-
supportsLongRunning = false;
167+
newState.supportsDropping = false;
168+
newState.supportsLongRunning = false;
180169
log.debug("Falling back to probing, client dropping will be disabled");
181170
// disable metrics unless the info endpoint is present, which prevents
182171
// sending metrics to 7.26.0, which has a bug in reporting metric origin
183-
metricsEndpoint = null;
172+
newState.metricsEndpoint = null;
184173
}
185174

186175
// don't want to rewire the traces pipeline
187-
if (null == traceEndpoint) {
188-
traceEndpoint = probeTracesEndpoint(traceEndpoints);
189-
} else if (state == null || state.isEmpty()) {
176+
if (null == newState.traceEndpoint) {
177+
newState.traceEndpoint = probeTracesEndpoint(newState, traceEndpoints);
178+
} else if (newState.state == null || newState.state.isEmpty()) {
190179
// Still need to probe so that state is correctly assigned
191-
probeTracesEndpoint(new String[] {traceEndpoint});
180+
probeTracesEndpoint(newState, new String[] {newState.traceEndpoint});
192181
}
193182
}
194183

195184
if (log.isDebugEnabled()) {
196185
log.debug(
197186
"discovered traceEndpoint={}, metricsEndpoint={}, supportsDropping={}, supportsLongRunning={}, dataStreamsEndpoint={}, configEndpoint={}, evpProxyEndpoint={}, telemetryProxyEndpoint={}",
198-
traceEndpoint,
199-
metricsEndpoint,
200-
supportsDropping,
201-
supportsLongRunning,
202-
dataStreamsEndpoint,
203-
configEndpoint,
204-
evpProxyEndpoint,
205-
telemetryProxyEndpoint);
187+
newState.traceEndpoint,
188+
newState.metricsEndpoint,
189+
newState.supportsDropping,
190+
newState.supportsLongRunning,
191+
newState.dataStreamsEndpoint,
192+
newState.configEndpoint,
193+
newState.evpProxyEndpoint,
194+
newState.telemetryProxyEndpoint);
206195
}
207196
}
208197

209-
private String probeTracesEndpoint(String[] endpoints) {
198+
private String probeTracesEndpoint(State newState, String[] endpoints) {
210199
for (String candidate : endpoints) {
211200
try (Response response =
212201
client
@@ -219,7 +208,7 @@ private String probeTracesEndpoint(String[] endpoints) {
219208
.build())
220209
.execute()) {
221210
if (response.code() != 404) {
222-
state = response.header(DATADOG_AGENT_STATE);
211+
newState.state = response.header(DATADOG_AGENT_STATE);
223212
return candidate;
224213
}
225214
} catch (Throwable e) {
@@ -243,11 +232,11 @@ private void processInfoResponseHeaders(Response response) {
243232
}
244233

245234
@SuppressWarnings("unchecked")
246-
private boolean processInfoResponse(String response) {
235+
private boolean processInfoResponse(State newState, String response) {
247236
try {
248237
Map<String, Object> map = RESPONSE_ADAPTER.fromJson(response);
249238
discoverStatsDPort(map);
250-
version = (String) map.get("version");
239+
newState.version = (String) map.get("version");
251240
Set<String> endpoints = new HashSet<>((List<String>) map.get("endpoints"));
252241

253242
String foundMetricsEndpoint = null;
@@ -261,18 +250,18 @@ private boolean processInfoResponse(String response) {
261250
}
262251

263252
// This is done outside of the loop to set metricsEndpoint to null if not found
264-
metricsEndpoint = foundMetricsEndpoint;
253+
newState.metricsEndpoint = foundMetricsEndpoint;
265254

266255
for (String endpoint : traceEndpoints) {
267256
if (containsEndpoint(endpoints, endpoint)) {
268-
traceEndpoint = endpoint;
257+
newState.traceEndpoint = endpoint;
269258
break;
270259
}
271260
}
272261

273262
for (String endpoint : configEndpoints) {
274263
if (containsEndpoint(endpoints, endpoint)) {
275-
configEndpoint = endpoint;
264+
newState.configEndpoint = endpoint;
276265
break;
277266
}
278267
}
@@ -281,56 +270,58 @@ private boolean processInfoResponse(String response) {
281270
// intake
282271
// because older agents support diagnostics, we fallback to it before falling back to v1
283272
if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V2)) {
284-
debuggerEndpoint = DEBUGGER_ENDPOINT_V2;
273+
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V2;
285274
} else if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
286-
debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
275+
newState.debuggerEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
287276
} else if (containsEndpoint(endpoints, DEBUGGER_ENDPOINT_V1)) {
288-
debuggerEndpoint = DEBUGGER_ENDPOINT_V1;
277+
newState.debuggerEndpoint = DEBUGGER_ENDPOINT_V1;
289278
}
290279
if (containsEndpoint(endpoints, DEBUGGER_DIAGNOSTICS_ENDPOINT)) {
291-
debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
280+
newState.debuggerDiagnosticsEndpoint = DEBUGGER_DIAGNOSTICS_ENDPOINT;
292281
}
293282

294283
for (String endpoint : dataStreamsEndpoints) {
295284
if (containsEndpoint(endpoints, endpoint)) {
296-
dataStreamsEndpoint = endpoint;
285+
newState.dataStreamsEndpoint = endpoint;
297286
break;
298287
}
299288
}
300289

301290
for (String endpoint : evpProxyEndpoints) {
302291
if (containsEndpoint(endpoints, endpoint)) {
303-
evpProxyEndpoint = endpoint;
292+
newState.evpProxyEndpoint = endpoint;
304293
break;
305294
}
306295
}
307296

308297
for (String endpoint : telemetryProxyEndpoints) {
309298
if (containsEndpoint(endpoints, endpoint)) {
310-
telemetryProxyEndpoint = endpoint;
299+
newState.telemetryProxyEndpoint = endpoint;
311300
break;
312301
}
313302
}
314303

315-
supportsLongRunning = Boolean.TRUE.equals(map.getOrDefault("long_running_spans", false));
304+
newState.supportsLongRunning =
305+
Boolean.TRUE.equals(map.getOrDefault("long_running_spans", false));
316306

317307
if (metricsEnabled) {
318308
Object canDrop = map.get("client_drop_p0s");
319-
supportsDropping =
309+
newState.supportsDropping =
320310
null != canDrop
321311
&& ("true".equalsIgnoreCase(String.valueOf(canDrop))
322312
|| Boolean.TRUE.equals(canDrop));
323313

324314
Object peer_tags = map.get("peer_tags");
325-
peerTags =
315+
newState.peerTags =
326316
peer_tags instanceof List
327317
? unmodifiableSet(new HashSet<>((List<String>) peer_tags))
328318
: emptySet();
329319
}
330320
try {
331-
state = Strings.sha256(response);
321+
newState.state = Strings.sha256(response);
332322
} catch (NoSuchAlgorithmException ex) {
333-
log.debug("Failed to hash trace agent /info response. Will probe {}", traceEndpoint, ex);
323+
log.debug(
324+
"Failed to hash trace agent /info response. Will probe {}", newState.traceEndpoint, ex);
334325
}
335326
return true;
336327
} catch (Throwable error) {
@@ -364,88 +355,89 @@ private static void discoverStatsDPort(final Map<String, Object> info) {
364355
}
365356

366357
public boolean supportsMetrics() {
367-
return metricsEnabled && null != metricsEndpoint;
358+
return metricsEnabled && null != discoveryState.metricsEndpoint;
368359
}
369360

370361
public boolean supportsDebugger() {
371-
return debuggerEndpoint != null;
362+
return discoveryState.debuggerEndpoint != null;
372363
}
373364

374365
public String getDebuggerEndpoint() {
375-
return debuggerEndpoint;
366+
return discoveryState.debuggerEndpoint;
376367
}
377368

378369
public boolean supportsDebuggerDiagnostics() {
379-
return debuggerDiagnosticsEndpoint != null;
370+
return discoveryState.debuggerDiagnosticsEndpoint != null;
380371
}
381372

382373
public boolean supportsDropping() {
383-
return supportsDropping;
374+
return discoveryState.supportsDropping;
384375
}
385376

386377
public boolean supportsLongRunning() {
387-
return supportsLongRunning;
378+
return discoveryState.supportsLongRunning;
388379
}
389380

390381
public Set<String> peerTags() {
391-
return peerTags;
382+
return discoveryState.peerTags;
392383
}
393384

394385
public String getMetricsEndpoint() {
395-
return metricsEndpoint;
386+
return discoveryState.metricsEndpoint;
396387
}
397388

398389
public String getTraceEndpoint() {
399-
return traceEndpoint;
390+
return discoveryState.traceEndpoint;
400391
}
401392

402393
public String getDataStreamsEndpoint() {
403-
return dataStreamsEndpoint;
394+
return discoveryState.dataStreamsEndpoint;
404395
}
405396

406397
public String getEvpProxyEndpoint() {
407-
return evpProxyEndpoint;
398+
return discoveryState.evpProxyEndpoint;
408399
}
409400

410401
public HttpUrl buildUrl(String endpoint) {
411402
return agentBaseUrl.resolve(endpoint);
412403
}
413404

414405
public boolean supportsDataStreams() {
415-
return dataStreamsEndpoint != null;
406+
return discoveryState.dataStreamsEndpoint != null;
416407
}
417408

418409
public boolean supportsEvpProxy() {
419-
return evpProxyEndpoint != null;
410+
return discoveryState.evpProxyEndpoint != null;
420411
}
421412

422413
public boolean supportsContentEncodingHeadersWithEvpProxy() {
423414
// content encoding headers are supported in /v4 and above
415+
final String evpProxyEndpoint = discoveryState.evpProxyEndpoint;
424416
return evpProxyEndpoint != null && V4_EVP_PROXY_ENDPOINT.compareTo(evpProxyEndpoint) <= 0;
425417
}
426418

427419
public String getConfigEndpoint() {
428-
return configEndpoint;
420+
return discoveryState.configEndpoint;
429421
}
430422

431423
public String getVersion() {
432-
return version;
424+
return discoveryState.version;
433425
}
434426

435427
private void errorQueryingEndpoint(String endpoint, Throwable t) {
436428
log.debug(LogCollector.EXCLUDE_TELEMETRY, "Error querying {} at {}", endpoint, agentBaseUrl, t);
437429
}
438430

439431
public String state() {
440-
return state;
432+
return discoveryState.state;
441433
}
442434

443435
@Override
444436
public boolean active() {
445-
return supportsMetrics() && supportsDropping;
437+
return supportsMetrics() && discoveryState.supportsDropping;
446438
}
447439

448440
public boolean supportsTelemetryProxy() {
449-
return telemetryProxyEndpoint != null;
441+
return discoveryState.telemetryProxyEndpoint != null;
450442
}
451443
}

0 commit comments

Comments
 (0)