20
20
using System . Linq ;
21
21
using System . Threading ;
22
22
using System . Threading . Tasks ;
23
+ using Cassandra . Data . Linq ;
24
+ using Cassandra . IntegrationTests . Policies . Util ;
25
+ using Cassandra . IntegrationTests . SimulacronAPI ;
23
26
using Cassandra . IntegrationTests . TestBase ;
27
+ using Cassandra . IntegrationTests . TestClusterManagement . Simulacron ;
24
28
using Cassandra . Mapping ;
25
29
using Cassandra . OpenTelemetry ;
26
30
using Cassandra . Tests ;
@@ -42,10 +46,13 @@ public class OpenTelemetryTests : SharedClusterTest
42
46
43
47
private readonly List < Activity > _exportedActivities = new List < Activity > ( ) ;
44
48
49
+ private readonly ActivitySource InternalActivitySource = new ActivitySource ( "testeActivitySource" ) ;
50
+
45
51
public OpenTelemetryTests ( )
46
52
{
47
53
Sdk . CreateTracerProviderBuilder ( )
48
54
. AddSource ( OpenTelemetrySourceName )
55
+ . AddSource ( InternalActivitySource . Name )
49
56
. AddInMemoryExporter ( _exportedActivities )
50
57
. Build ( ) ;
51
58
}
@@ -191,16 +198,7 @@ public async Task AddOpenTelemetry_MapperAndMapperAsync_SessionRequestIsParentOf
191
198
192
199
session . ChangeKeyspace ( keyspace ) ;
193
200
194
- string createTableQuery = @"
195
- CREATE TABLE IF NOT EXISTS song (
196
- Artist text,
197
- Title text,
198
- Id uuid,
199
- ReleaseDate timestamp,
200
- PRIMARY KEY (id)
201
- )" ;
202
-
203
- session . Execute ( createTableQuery ) ;
201
+ CreateSongTable ( session ) ;
204
202
205
203
var mapper = new Mapper ( session ) ;
206
204
@@ -259,6 +257,174 @@ await mapper.InsertIfNotExistsAsync(songOne, testProfile, true, null)
259
257
ValidateNodeActivityAttributes ( syncNodeActivity ) ;
260
258
}
261
259
260
+ [ Category ( TestCategory . RealClusterLong ) ]
261
+ [ Test ]
262
+ [ NUnit . Framework . Ignore ( "LINQ flow seems different than the flow in which OTEL is implemented!" ) ]
263
+ public void AddOpenTelemetry_Linq_SessionRequestIsParentOfNodeRequest ( )
264
+ {
265
+ var keyspace = TestUtils . GetUniqueKeyspaceName ( ) . ToLowerInvariant ( ) ;
266
+
267
+ var cluster = GetNewTemporaryCluster ( b => b
268
+ . AddOpenTelemetryInstrumentation ( ) ) ;
269
+
270
+ var session = cluster . Connect ( ) ;
271
+
272
+ session . CreateKeyspaceIfNotExists ( keyspace ) ;
273
+
274
+ session . ChangeKeyspace ( keyspace ) ;
275
+
276
+ CreateSongTable ( session ) ;
277
+
278
+ var table = new Table < Song > ( Session , new MappingConfiguration ( ) . Define ( new Map < Song > ( ) . TableName ( "song" ) ) ) ;
279
+
280
+ var song = new Song
281
+ {
282
+ Id = Guid . NewGuid ( ) ,
283
+ Artist = "Led Zeppelin" ,
284
+ Title = "Mothership" ,
285
+ ReleaseDate = DateTimeOffset . UtcNow
286
+ } ;
287
+
288
+ // Clear activities to get the Linq one
289
+ _exportedActivities . Clear ( ) ;
290
+
291
+ table . Insert ( song ) ;
292
+
293
+ var syncActivities = GetActivities ( ) ;
294
+ }
295
+
296
+ [ Test ]
297
+ public void AddOpenTelemetry_WhenMethodIsInvokedAfterQuery_TraceIdIsTheSameThroughRequest ( )
298
+ {
299
+ var firstMethodName = "FirstMethod" ;
300
+ var secondMethodName = "SecondMethod" ;
301
+
302
+ using ( var activity = InternalActivitySource . StartActivity ( firstMethodName , ActivityKind . Internal ) )
303
+ {
304
+ var cluster = GetNewTemporaryCluster ( b => b . AddOpenTelemetryInstrumentation ( ) ) ;
305
+ var session = cluster . Connect ( ) ;
306
+
307
+ var statement = new SimpleStatement ( "SELECT key FROM system.local" ) ;
308
+ session . Execute ( statement ) ;
309
+
310
+ SecondMethod ( secondMethodName ) ;
311
+ }
312
+
313
+ var firstMethodActivity = GetActivities ( ) . First ( x => x . DisplayName == firstMethodName ) ;
314
+ var secondMethodActivity = GetActivities ( ) . First ( x => x . DisplayName == secondMethodName ) ;
315
+ var sessionActivity = GetActivities ( ) . First ( x => x . DisplayName == SessionActivityName ) ;
316
+
317
+ Assert . AreEqual ( firstMethodActivity . TraceId , sessionActivity . TraceId ) ;
318
+ Assert . AreEqual ( firstMethodActivity . SpanId , sessionActivity . ParentSpanId ) ;
319
+
320
+ Assert . AreEqual ( firstMethodActivity . TraceId , secondMethodActivity . TraceId ) ;
321
+ Assert . AreEqual ( firstMethodActivity . SpanId , secondMethodActivity . ParentSpanId ) ;
322
+ }
323
+
324
+ [ Test ]
325
+ public void AddOpenTelemetry_RetryOnNextHost_ShouldProduceOneErrorAndOneValidSpansForTheSameSessionSpan ( )
326
+ {
327
+ var expectedErrorDescription = "overloaded" ;
328
+
329
+ using ( var simulacronCluster = SimulacronCluster . CreateNew ( 3 ) )
330
+ {
331
+ var contactPoint = simulacronCluster . InitialContactPoint ;
332
+ var nodes = simulacronCluster . GetNodes ( ) . ToArray ( ) ;
333
+ var loadBalancingPolicy = new CustomLoadBalancingPolicy (
334
+ nodes . Select ( n => n . ContactPoint ) . ToArray ( ) ) ;
335
+
336
+ var builder = ClusterBuilder ( )
337
+ . AddContactPoint ( contactPoint )
338
+ . WithSocketOptions ( new SocketOptions ( )
339
+ . SetConnectTimeoutMillis ( 10000 )
340
+ . SetReadTimeoutMillis ( 5000 ) )
341
+ . WithLoadBalancingPolicy ( loadBalancingPolicy )
342
+ . WithRetryPolicy ( TryNextHostRetryPolicy . Instance )
343
+ . AddOpenTelemetryInstrumentation ( ) ;
344
+
345
+ using ( var cluster = builder . Build ( ) )
346
+ {
347
+ var session = ( Session ) cluster . Connect ( ) ;
348
+ const string cql = "select * from table2" ;
349
+
350
+ nodes [ 0 ] . PrimeFluent (
351
+ b => b . WhenQuery ( cql ) .
352
+ ThenOverloaded ( expectedErrorDescription ) ) ;
353
+
354
+ nodes [ 1 ] . PrimeFluent (
355
+ b => b . WhenQuery ( cql ) .
356
+ ThenRowsSuccess ( new [ ] { ( "text" , DataType . Ascii ) } , rows => rows . WithRow ( "test1" ) . WithRow ( "test2" ) ) ) ;
357
+
358
+ session . Execute ( new SimpleStatement ( cql ) . SetConsistencyLevel ( ConsistencyLevel . One ) ) ;
359
+
360
+ var activities = GetActivities ( ) ;
361
+ var sessionActivity = activities . First ( x => x . DisplayName . StartsWith ( SessionActivityName ) ) ;
362
+ var validNodeActivity = activities . First ( x => x . DisplayName . StartsWith ( NodeActivityName ) && x . Status != ActivityStatusCode . Error ) ;
363
+ var errorNodeActivity = activities . First ( x => x . DisplayName . StartsWith ( NodeActivityName ) && x . Status == ActivityStatusCode . Error ) ;
364
+
365
+ Assert . AreEqual ( sessionActivity . TraceId , validNodeActivity . TraceId ) ;
366
+ Assert . AreEqual ( sessionActivity . TraceId , errorNodeActivity . TraceId ) ;
367
+ Assert . AreEqual ( sessionActivity . SpanId , validNodeActivity . ParentSpanId ) ;
368
+ Assert . AreEqual ( sessionActivity . SpanId , errorNodeActivity . ParentSpanId ) ;
369
+ Assert . AreEqual ( expectedErrorDescription , errorNodeActivity . StatusDescription ) ;
370
+ Assert . True ( validNodeActivity . StartTimeUtc > errorNodeActivity . StartTimeUtc ) ;
371
+ }
372
+ }
373
+ }
374
+
375
+ [ Test ]
376
+ public void AddOpenTelemetry_WithPaginationOnQuery_ShouldMultipleSpansForTheSameTraceId ( )
377
+ {
378
+ var expectedNumberOfSessionActivities = 2 ;
379
+
380
+ using ( var activity = InternalActivitySource . StartActivity ( "Paging" , ActivityKind . Internal ) )
381
+ {
382
+ var session = GetNewTemporaryCluster ( b => b . AddOpenTelemetryInstrumentation ( ) ) . Connect ( ) ;
383
+
384
+ session . CreateKeyspaceIfNotExists ( KeyspaceName , null , false ) ;
385
+
386
+ session . ChangeKeyspace ( KeyspaceName ) ;
387
+
388
+ CreateSongTable ( session ) ;
389
+
390
+ for ( int i = 0 ; i < expectedNumberOfSessionActivities ; i ++ )
391
+ {
392
+ session . Execute ( string . Format ( "INSERT INTO {0}.song (Artist, Title, Id, ReleaseDate) VALUES('Pink Floyd', 'The Dark Side Of The Moon', {1}, {2})" , KeyspaceName , Guid . NewGuid ( ) , ( ( DateTimeOffset ) DateTime . UtcNow ) . ToUnixTimeSeconds ( ) ) ) ;
393
+ }
394
+
395
+ _exportedActivities . Clear ( ) ;
396
+
397
+ var rs = session . Execute ( new SimpleStatement ( string . Format ( "SELECT * FROM {0}.song" , KeyspaceName ) ) . SetPageSize ( 1 ) ) ;
398
+
399
+ rs . FetchMoreResults ( ) ;
400
+
401
+ var sessionActivities = GetActivities ( ) . Where ( x => x . DisplayName == SessionActivityName ) ;
402
+
403
+ Assert . AreEqual ( expectedNumberOfSessionActivities , sessionActivities . Count ( ) ) ;
404
+ Assert . AreEqual ( expectedNumberOfSessionActivities , rs . InnerQueueCount ) ;
405
+
406
+ var firstActivity = sessionActivities . First ( ) ;
407
+ var lastActivity = sessionActivities . Last ( ) ;
408
+
409
+ Assert . AreEqual ( firstActivity . TraceId , lastActivity . TraceId ) ;
410
+ Assert . AreNotEqual ( firstActivity . SpanId , lastActivity . SpanId ) ;
411
+ }
412
+ }
413
+
414
+ private static void CreateSongTable ( ISession session )
415
+ {
416
+ string createTableQuery = @"
417
+ CREATE TABLE IF NOT EXISTS song (
418
+ Artist text,
419
+ Title text,
420
+ Id uuid,
421
+ ReleaseDate timestamp,
422
+ PRIMARY KEY (id)
423
+ )" ;
424
+
425
+ session . Execute ( createTableQuery ) ;
426
+ }
427
+
262
428
private List < Activity > GetActivities ( )
263
429
{
264
430
var count = 0 ;
@@ -317,5 +483,13 @@ private static void ValidateNodeActivityAttributes(Activity activity)
317
483
Assert . AreEqual ( tags . FirstOrDefault ( x => x . Key == pair . Key ) . Value , expectedTags [ pair . Key ] ) ;
318
484
}
319
485
}
486
+
487
+ private void SecondMethod ( string activityName )
488
+ {
489
+ using ( var activity = InternalActivitySource . StartActivity ( activityName , ActivityKind . Internal ) )
490
+ {
491
+ activity . AddTag ( "db.test" , "t" ) ;
492
+ }
493
+ }
320
494
}
321
495
}
0 commit comments