Skip to content

Commit 50d234a

Browse files
committed
add Prepare request support to request tracker
1 parent 91643f1 commit 50d234a

18 files changed

+329
-92
lines changed

src/Cassandra.IntegrationTests/Core/ConnectionTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void Prepare_Query()
8888
using (var connection = CreateConnection())
8989
{
9090
connection.Open().Wait();
91-
var request = new PrepareRequest(GetSerializer(), BasicQuery, null, null);
91+
var request = new InternalPrepareRequest(GetSerializer(), BasicQuery, null, null);
9292
var task = connection.Send(request);
9393
task.Wait();
9494
Assert.AreEqual(TaskStatus.RanToCompletion, task.Status);
@@ -102,7 +102,7 @@ public void Prepare_ResponseError_Faults_Task()
102102
using (var connection = CreateConnection())
103103
{
104104
connection.Open().Wait();
105-
var request = new PrepareRequest(GetSerializer(), "SELECT WILL FAIL", null, null);
105+
var request = new InternalPrepareRequest(GetSerializer(), "SELECT WILL FAIL", null, null);
106106
var task = connection.Send(request);
107107
task.ContinueWith(t =>
108108
{
@@ -121,7 +121,7 @@ public void Execute_Prepared_Test()
121121
connection.Open().Wait();
122122

123123
//Prepare a query
124-
var prepareRequest = new PrepareRequest(GetSerializer(), BasicQuery, null, null);
124+
var prepareRequest = new InternalPrepareRequest(GetSerializer(), BasicQuery, null, null);
125125
var task = connection.Send(prepareRequest);
126126
var prepareOutput = ValidateResult<OutputPrepared>(task.Result);
127127

@@ -150,7 +150,7 @@ public void Execute_Prepared_With_Param_Test()
150150
{
151151
connection.Open().Wait();
152152

153-
var prepareRequest = new PrepareRequest(GetSerializer(), "SELECT * FROM system.local WHERE key = ?", null, null);
153+
var prepareRequest = new InternalPrepareRequest(GetSerializer(), "SELECT * FROM system.local WHERE key = ?", null, null);
154154
var task = connection.Send(prepareRequest);
155155
var prepareOutput = ValidateResult<OutputPrepared>(task.Result);
156156

src/Cassandra.IntegrationTests/OpenTelemetry/OpenTelemetryTests.cs

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,11 @@ public async Task AddOpenTelemetry_MapperAndMapperAsync_SessionRequestIsParentOf
240240
await mapper.InsertIfNotExistsAsync(songOne, testProfile, true, null)
241241
.ContinueWith(t =>
242242
{
243-
RetryUntilActivities(localDateTime, SessionActivityName, 1, displayNameStartsWith: true);
244-
RetryUntilActivities(localDateTime, NodeActivityName, 1, displayNameStartsWith: true);
243+
RetryUntilActivities(localDateTime, $"{SessionActivityName}({nameof(BoundStatement)}) {keyspace}", 1);
244+
RetryUntilActivities(localDateTime, $"{NodeActivityName}({nameof(BoundStatement)}) {keyspace}", 1);
245245
var activities = GetActivities(localDateTime);
246-
var sessionActivity = activities.First(x => x.DisplayName.StartsWith(SessionActivityName));
247-
var nodeActivity = activities.First(x => x.DisplayName.StartsWith(NodeActivityName));
246+
var sessionActivity = activities.First(x => x.DisplayName == $"{SessionActivityName}({nameof(BoundStatement)}) {keyspace}");
247+
var nodeActivity = activities.First(x => x.DisplayName == $"{NodeActivityName}({nameof(BoundStatement)}) {keyspace}");
248248

249249
Assert.IsNull(sessionActivity.ParentId);
250250
Assert.AreEqual(sessionActivity.TraceId, nodeActivity.TraceId);
@@ -313,12 +313,12 @@ public void AddOpenTelemetry_Linq_SessionRequestIsParentOfNodeRequest()
313313

314314
table.Insert(song).Execute();
315315

316-
RetryUntilActivities(_testStartDateTime, SessionActivityName, 1, displayNameStartsWith: true);
317-
RetryUntilActivities(_testStartDateTime, NodeActivityName, 1, displayNameStartsWith: true);
316+
RetryUntilActivities(_testStartDateTime, $"{SessionActivityName}({nameof(BoundStatement)}) {keyspace}", 1);
317+
RetryUntilActivities(_testStartDateTime, $"{NodeActivityName}({nameof(BoundStatement)}) {keyspace}", 1);
318318
var syncActivities = GetActivities(_testStartDateTime);
319319

320-
var syncSessionActivity = syncActivities.First(x => x.DisplayName.StartsWith(SessionActivityName));
321-
var syncNodeActivity = syncActivities.First(x => x.DisplayName.StartsWith(NodeActivityName));
320+
var syncSessionActivity = syncActivities.First(x => x.DisplayName == $"{SessionActivityName}({nameof(BoundStatement)}) {keyspace}");
321+
var syncNodeActivity = syncActivities.First(x => x.DisplayName == $"{NodeActivityName}({nameof(BoundStatement)}) {keyspace}");
322322

323323
Assert.IsNull(syncSessionActivity.ParentId);
324324
Assert.AreEqual(syncSessionActivity.TraceId, syncNodeActivity.TraceId);
@@ -352,7 +352,9 @@ public void AddOpenTelemetry_WhenMethodIsInvokedAfterQuery_TraceIdIsTheSameThrou
352352
SecondMethod(secondMethodName);
353353
}
354354

355-
RetryUntilActivities(_testStartDateTime, SessionActivityName, 7); // simple + 2xlinq(prepare+bound) + 2xmapper(prepare+bound) + create + change
355+
var prepareCount = 2 * this.AmountOfNodes; // 2 * because Mapper + LINQ, AmountOfNodes because PREPARE goes to every node by default
356+
357+
RetryUntilActivities(_testStartDateTime, SessionActivityName, 7 + prepareCount); // create + change keyspace + simple + 2xlinq(create+bound) + 2xmapper(create+bound) + PREPAREs
356358
RetryUntilActivities(_testStartDateTime, secondMethodName, 1);
357359
RetryUntilActivities(_testStartDateTime, firstMethodName, 1);
358360
var activities = GetActivities(_testStartDateTime).ToList();
@@ -361,7 +363,7 @@ public void AddOpenTelemetry_WhenMethodIsInvokedAfterQuery_TraceIdIsTheSameThrou
361363
var secondMethodActivity = activities.First(x => x.DisplayName == secondMethodName);
362364
var sessionActivities = activities.Where(x => x.DisplayName.StartsWith(SessionActivityName)).ToList();
363365

364-
Assert.AreEqual(7, sessionActivities.Count); // simple + 2xlinq(prepare+bound) + 2xmapper(prepare+bound) + create + change
366+
Assert.AreEqual(7 + prepareCount, sessionActivities.Count); // create + change keyspace + simple + 2xlinq(create+bound) + 2xmapper(create+bound) + PREPAREs
365367

366368
sessionActivities.ForEach(act =>
367369
{
@@ -396,7 +398,9 @@ public async Task AddOpenTelemetry_WhenMethodIsInvokedAfterQuery_TraceIdIsTheSam
396398
SecondMethod(secondMethodName);
397399
}
398400

399-
RetryUntilActivities(_testStartDateTime, SessionActivityName, 7); // simple + 2xlinq(prepare+bound) + 2xmapper(prepare+bound) + create + change
401+
var prepareCount = 2 * this.AmountOfNodes; // 2 * because Mapper + LINQ, AmountOfNodes because PREPARE goes to every node by default
402+
403+
RetryUntilActivities(_testStartDateTime, SessionActivityName, 7 + prepareCount); // create + change keyspace + simple + 2xlinq(create+bound) + 2xmapper(create+bound) + PREPAREs
400404
RetryUntilActivities(_testStartDateTime, secondMethodName, 1);
401405
RetryUntilActivities(_testStartDateTime, firstMethodName, 1);
402406
var activities = GetActivities(_testStartDateTime).ToList();
@@ -405,7 +409,7 @@ public async Task AddOpenTelemetry_WhenMethodIsInvokedAfterQuery_TraceIdIsTheSam
405409
var secondMethodActivity = activities.First(x => x.DisplayName == secondMethodName);
406410
var sessionActivities = activities.Where(x => x.DisplayName.StartsWith(SessionActivityName)).ToList();
407411

408-
Assert.AreEqual(7, sessionActivities.Count); // simple + 2xlinq(prepare+bound) + 2xmapper(prepare+bound) + create + change
412+
Assert.AreEqual(7 + prepareCount, sessionActivities.Count); // create + change keyspace + simple + 3xlinq(create+prepare+bound) + 3xmapper(create+prepare+bound)
409413

410414
sessionActivities.ForEach(act =>
411415
{
@@ -621,8 +625,8 @@ public async Task AddOpenTelemetry_Batch_ExpectedStatement()
621625
RetryUntilActivities(localDateTime, $"{NodeActivityName}({nameof(BatchStatement)}) {keyspace}", 1);
622626

623627
var syncActivities = GetActivities(localDateTime);
624-
var syncSessionActivity = syncActivities.First(x => x.DisplayName.StartsWith(SessionActivityName));
625-
var syncNodeActivity = syncActivities.First(x => x.DisplayName.StartsWith(NodeActivityName));
628+
var syncSessionActivity = syncActivities.First(x => x.DisplayName == $"{SessionActivityName}({nameof(BatchStatement)}) {keyspace}");
629+
var syncNodeActivity = syncActivities.First(x => x.DisplayName == $"{NodeActivityName}({nameof(BatchStatement)}) {keyspace}");
626630

627631
Assert.IsNull(syncSessionActivity.ParentId);
628632
Assert.AreEqual(syncSessionActivity.TraceId, syncNodeActivity.TraceId);
@@ -637,6 +641,81 @@ public async Task AddOpenTelemetry_Batch_ExpectedStatement()
637641
Assert.Contains(new KeyValuePair<string, string>("db.query.text", expectedStatement), syncNodeActivity.Tags.ToArray());
638642
}
639643

644+
[Category(TestCategory.RealCluster)]
645+
[Test]
646+
[TestCase(true)]
647+
[TestCase(false)]
648+
public async Task AddOpenTelemetry_Mapper_ExpectedPrepareActivities(bool prepareOnAllHosts)
649+
{
650+
var keyspace = TestUtils.GetUniqueKeyspaceName().ToLowerInvariant();
651+
652+
var cluster = GetNewTemporaryCluster(b => b
653+
.WithOpenTelemetryInstrumentation(opt => opt.IncludeDatabaseStatement = true)
654+
.WithQueryOptions(new QueryOptions().SetPrepareOnAllHosts(prepareOnAllHosts)));
655+
656+
var session = cluster.Connect();
657+
658+
session.CreateKeyspaceIfNotExists(keyspace);
659+
660+
session.ChangeKeyspace(keyspace);
661+
662+
CreateSongTable(session);
663+
664+
Task.Delay(100).GetAwaiter().GetResult();
665+
666+
var localDateTime = DateTime.UtcNow;
667+
668+
var mapper = new Mapper(session, new MappingConfiguration()
669+
.Define(new Map<Song>().PartitionKey(s => s.Id).TableName("song").KeyspaceName(keyspace)));
670+
671+
var songOne = new Song
672+
{
673+
Id = Guid.NewGuid(),
674+
Artist = "Led Zeppelin",
675+
Title = "Mothership",
676+
ReleaseDate = DateTimeOffset.UtcNow
677+
};
678+
679+
await mapper.InsertAsync(songOne).ConfigureAwait(false);
680+
681+
RetryUntilActivities(localDateTime, $"{SessionActivityName}({nameof(BoundStatement)}) {keyspace}", 1);
682+
RetryUntilActivities(localDateTime, $"{NodeActivityName}({nameof(BoundStatement)}) {keyspace}", 1);
683+
RetryUntilActivities(localDateTime, $"{SessionActivityName}({nameof(PrepareRequest)})", prepareOnAllHosts ? AmountOfNodes : 1);
684+
RetryUntilActivities(localDateTime, $"{NodeActivityName}({nameof(PrepareRequest)})", prepareOnAllHosts ? AmountOfNodes : 1);
685+
686+
var allActivities = GetActivities(localDateTime).ToList();
687+
var boundStmtSessionActivity = allActivities.Single(x => x.DisplayName == $"{SessionActivityName}({nameof(BoundStatement)}) {keyspace}");
688+
var boundStmtNodeActivity = allActivities.Single(x => x.DisplayName == $"{NodeActivityName}({nameof(BoundStatement)}) {keyspace}");
689+
var prepareStmtSessionActivities = allActivities.Where(x => x.DisplayName == $"{SessionActivityName}({nameof(PrepareRequest)})").ToList();
690+
var prepareStmtNodeActivities = allActivities.Where(x => x.DisplayName == $"{NodeActivityName}({nameof(PrepareRequest)})").ToList();
691+
692+
Assert.IsNull(boundStmtSessionActivity.ParentId);
693+
Assert.AreEqual(boundStmtSessionActivity.TraceId, boundStmtNodeActivity.TraceId);
694+
Assert.AreEqual(boundStmtSessionActivity.SpanId, boundStmtNodeActivity.ParentSpanId);
695+
696+
var expectedStatement = $"INSERT INTO {keyspace}.song (Artist, Id, ReleaseDate, Title) VALUES (?, ?, ?, ?)";
697+
698+
ValidateSessionActivityAttributes(boundStmtSessionActivity, typeof(BoundStatement));
699+
ValidateNodeActivityAttributes(boundStmtNodeActivity, typeof(BoundStatement));
700+
Assert.Contains(new KeyValuePair<string, string>("db.query.text", expectedStatement), boundStmtSessionActivity.Tags.ToArray());
701+
Assert.Contains(new KeyValuePair<string, string>("db.query.text", expectedStatement), boundStmtNodeActivity.Tags.ToArray());
702+
703+
foreach (var prepareStmtSessionActivity in prepareStmtSessionActivities)
704+
{
705+
ValidateSessionActivityAttributes(prepareStmtSessionActivity, typeof(PrepareRequest));
706+
Assert.Contains(new KeyValuePair<string, string>("db.query.text", expectedStatement), prepareStmtSessionActivity.Tags.ToArray());
707+
}
708+
709+
foreach (var prepareStmtNodeActivity in prepareStmtNodeActivities)
710+
{
711+
ValidateNodeActivityAttributes(prepareStmtNodeActivity, typeof(PrepareRequest));
712+
Assert.Contains(new KeyValuePair<string, string>("db.query.text", expectedStatement), prepareStmtNodeActivity.Tags.ToArray());
713+
Assert.IsNotNull(prepareStmtNodeActivity.ParentId);
714+
var parentActivity = prepareStmtSessionActivities.Single(a => a.SpanId == prepareStmtNodeActivity.ParentSpanId);
715+
ValidateSessionActivityAttributes(parentActivity, typeof(PrepareRequest));
716+
}
717+
}
718+
640719
private async Task SimpleStatementMethodAsync(ISession session)
641720
{
642721
var statement = new SimpleStatement("SELECT key FROM system.local");

src/Cassandra.Tests/RequestHandlerTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
using NUnit.Framework;
2727

28-
using PrepareFlags = Cassandra.Requests.PrepareRequest.PrepareFlags;
28+
using PrepareFlags = Cassandra.Requests.InternalPrepareRequest.PrepareFlags;
2929
using QueryFlags = Cassandra.QueryProtocolOptions.QueryFlags;
3030

3131
namespace Cassandra.Tests
@@ -729,7 +729,7 @@ public void Prepare_With_Keyspace_Should_Send_Keyspace_And_Flag()
729729
{
730730
const string query = "QUERY1";
731731
const string keyspace = "ks1";
732-
var request = new PrepareRequest(RequestHandlerTests.Serializer, query, keyspace, null);
732+
var request = new InternalPrepareRequest(RequestHandlerTests.Serializer, query, keyspace, null);
733733

734734
// The request is composed by: <query><flags>[<keyspace>]
735735
var buffer = GetBodyBuffer(request);
@@ -751,7 +751,7 @@ public void Prepare_With_Keyspace_On_Lower_Protocol_Version_Should_Ignore_Keyspa
751751
{
752752
const string query = "SELECT col1, col2 FROM table1";
753753
var serializer = new SerializerManager(ProtocolVersion.V2).GetCurrentSerializer();
754-
var request = new PrepareRequest(serializer, query, "my_keyspace", null);
754+
var request = new InternalPrepareRequest(serializer, query, "my_keyspace", null);
755755

756756
// The request only contains the query
757757
var buffer = GetBodyBuffer(request, serializer);

src/Cassandra.Tests/Requests/PrepareHandlerTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public async Task Should_NotSendRequestToSecondHost_When_SecondHostDoesntHavePoo
8484
var pools = mockResult.Session.GetPools().ToList();
8585
Assert.AreEqual(2, pools.Count);
8686
var distanceCount = Interlocked.Read(ref lbpCluster.DistanceCount);
87-
var request = new PrepareRequest(_serializer, "TEST", null, null);
87+
var request = new InternalPrepareRequest(_serializer, "TEST", null, null);
8888

8989
await mockResult.PrepareHandler.Prepare(
9090
request,
@@ -156,7 +156,7 @@ public async Task Should_NotSendRequestToSecondHost_When_SecondHostPoolDoesNotHa
156156
var pools = mockResult.Session.GetPools().ToList();
157157
Assert.AreEqual(3, pools.Count);
158158
var distanceCount = Interlocked.Read(ref lbpCluster.DistanceCount);
159-
var request = new PrepareRequest(_serializer, "TEST", null, null);
159+
var request = new InternalPrepareRequest(_serializer, "TEST", null, null);
160160

161161
await mockResult.PrepareHandler.Prepare(
162162
request,
@@ -229,7 +229,7 @@ public async Task Should_SendRequestToAllHosts_When_AllHostsHaveConnections()
229229
Assert.AreEqual(3, pools.Count);
230230

231231
var distanceCount = Interlocked.Read(ref lbpCluster.DistanceCount);
232-
var request = new PrepareRequest(_serializer, "TEST", null, null);
232+
var request = new InternalPrepareRequest(_serializer, "TEST", null, null);
233233

234234
await mockResult.PrepareHandler.Prepare(
235235
request,
@@ -301,7 +301,7 @@ public async Task Should_SendRequestToAllHosts_When_AllHostsHaveConnectionsButFi
301301
var pools = mockResult.Session.GetPools().ToList();
302302
Assert.AreEqual(2, pools.Count);
303303
var distanceCount = Interlocked.Read(ref lbpCluster.DistanceCount);
304-
var request = new PrepareRequest(_serializer, "TEST", null, null);
304+
var request = new InternalPrepareRequest(_serializer, "TEST", null, null);
305305

306306
await mockResult.PrepareHandler.Prepare(
307307
request,
@@ -373,7 +373,7 @@ public async Task Should_SendRequestToAllHosts_When_AllHostsHaveConnectionsButFi
373373
var pools = mockResult.Session.GetPools().ToList();
374374
Assert.AreEqual(2, pools.Count);
375375
var distanceCount = Interlocked.Read(ref lbpCluster.DistanceCount);
376-
var request = new PrepareRequest(_serializer, "TEST", null, null);
376+
var request = new InternalPrepareRequest(_serializer, "TEST", null, null);
377377

378378
await mockResult.PrepareHandler.Prepare(
379379
request,
@@ -446,7 +446,7 @@ public async Task Should_SendRequestToFirstHostOnly_When_PrepareOnAllHostsIsFals
446446
var pools = mockResult.Session.GetPools().ToList();
447447
Assert.AreEqual(2, pools.Count);
448448
var distanceCount = Interlocked.Read(ref lbpCluster.DistanceCount);
449-
var request = new PrepareRequest(_serializer, "TEST", null, null);
449+
var request = new InternalPrepareRequest(_serializer, "TEST", null, null);
450450

451451
await mockResult.PrepareHandler.Prepare(
452452
request,

src/Cassandra/Cluster.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -622,10 +622,10 @@ HostDistance IInternalCluster.RetrieveAndSetDistance(Host host)
622622

623623
/// <inheritdoc />
624624
async Task<PreparedStatement> IInternalCluster.Prepare(
625-
IInternalSession session, ISerializerManager serializerManager, PrepareRequest request)
625+
IInternalSession session, ISerializerManager serializerManager, InternalPrepareRequest request)
626626
{
627627
var lbp = session.Cluster.Configuration.DefaultRequestOptions.LoadBalancingPolicy;
628-
var handler = InternalRef.Configuration.PrepareHandlerFactory.CreatePrepareHandler(serializerManager, this);
628+
var handler = InternalRef.Configuration.PrepareHandlerFactory.CreatePrepareHandler(serializerManager, this, session, request);
629629
var ps = await handler.Prepare(request, session, lbp.NewQueryPlan(session.Keyspace, null).GetEnumerator()).ConfigureAwait(false);
630630
var psAdded = InternalRef.PreparedQueries.GetOrAdd(ps.Id, ps);
631631
if (ps != psAdded)
@@ -670,7 +670,7 @@ private async Task ReprepareAllQueries(Host host)
670670
{
671671
foreach (var ps in preparedQueries)
672672
{
673-
var request = new PrepareRequest(serializer, ps.Cql, ps.Keyspace, null);
673+
var request = new InternalPrepareRequest(serializer, ps.Cql, ps.Keyspace, null);
674674
await semaphore.WaitAsync().ConfigureAwait(false);
675675
tasks.Add(Task.Run(() => handler.ReprepareOnSingleNodeAsync(
676676
new KeyValuePair<Host, IHostConnectionPool>(host, pool),

0 commit comments

Comments
 (0)