@@ -226,7 +226,10 @@ impl<RT: Runtime> Committer<RT> {
226
226
// commit out of order and regress the repeatable timestamp.
227
227
let mut next_bump_wait = Some ( * MAX_REPEATABLE_TIMESTAMP_COMMIT_DELAY ) ;
228
228
229
- let mut root_span = None ;
229
+ // This span starts with receiving a commit message and ends with that same
230
+ // commit getting published. It captures all of the committer activity
231
+ // in between.
232
+ let mut committer_span = None ;
230
233
// Keep a monotonically increasing id to keep track of honeycomb traces
231
234
// Each commit_id tracks a single write to persistence, from the time the commit
232
235
// message is received until the time the commit has been published. We skip
@@ -245,16 +248,17 @@ impl<RT: Runtime> Committer<RT> {
245
248
} ;
246
249
select_biased ! {
247
250
_ = bump_fut. fuse( ) => {
248
- let root_span = root_span . get_or_insert_with( || {
251
+ let committer_span = committer_span . get_or_insert_with( || {
249
252
span_commit_id = Some ( commit_id) ;
250
253
Span :: root( "bump_max_repeatable" , SpanContext :: random( ) )
251
254
} ) ;
252
- let _span = Span :: enter_with_parent( "queue_bump_max_repeatable" , root_span) ;
255
+ let local_span = Span :: enter_with_parent( "queue_bump_max_repeatable" , committer_span) ;
256
+ local_span. set_local_parent( ) ;
253
257
// Advance the repeatable read timestamp so non-leaders can
254
258
// establish a recent repeatable snapshot.
255
259
next_bump_wait = None ;
256
260
let ( tx, _rx) = oneshot:: channel( ) ;
257
- self . bump_max_repeatable_ts( tx, commit_id, root_span ) ;
261
+ self . bump_max_repeatable_ts( tx, commit_id, committer_span ) ;
258
262
commit_id += 1 ;
259
263
last_bumped_repeatable_ts = self . runtime. monotonic_now( ) ;
260
264
}
@@ -277,9 +281,9 @@ impl<RT: Runtime> Committer<RT> {
277
281
parent_trace,
278
282
..
279
283
} => {
280
- let _span = root_span . as_ref ( ) . map ( |root| Span :: enter_with_parent ( " publish_commit", root ) ) ;
281
- let root = initialize_root_from_parent ( "Committer:: publish_commit", parent_trace ) ;
282
- let _guard = root . set_local_parent( ) ;
284
+ let parent_span = initialize_root_from_parent ( "Committer:: publish_commit", parent_trace ) ;
285
+ let publish_commit_span = committer_span . as_ref ( ) . map ( |root| Span :: enter_with_parents ( " publish_commit", [ root , & parent_span ] ) ) . unwrap_or_else ( || parent_span ) ;
286
+ let _guard = publish_commit_span . set_local_parent( ) ;
283
287
let commit_ts = pending_write. must_commit_ts( ) ;
284
288
self . publish_commit( pending_write) ;
285
289
let _ = result. send( Ok ( commit_ts) ) ;
@@ -297,7 +301,8 @@ impl<RT: Runtime> Committer<RT> {
297
301
result,
298
302
..
299
303
} => {
300
- let _span = root_span. as_ref( ) . map( |root| Span :: enter_with_parent( "publish_max_repeatable_ts" , root) ) ;
304
+ let span = committer_span. as_ref( ) . map( |root| Span :: enter_with_parent( "publish_max_repeatable_ts" , root) ) . unwrap_or_else( Span :: noop) ;
305
+ span. set_local_parent( ) ;
301
306
self . publish_max_repeatable_ts( new_max_repeatable) ;
302
307
next_bump_wait = Some ( * MAX_REPEATABLE_TIMESTAMP_IDLE_FREQUENCY ) ;
303
308
let _ = result. send( new_max_repeatable) ;
@@ -306,7 +311,7 @@ impl<RT: Runtime> Committer<RT> {
306
311
}
307
312
// Report the trace if it is longer than the threshold
308
313
if let Some ( id) = span_commit_id && id == pending_commit_id {
309
- if let Some ( mut span) = root_span . take( ) {
314
+ if let Some ( mut span) = committer_span . take( ) {
310
315
if span. elapsed( ) < Some ( * COMMIT_TRACE_THRESHOLD ) {
311
316
tracing:: debug!( "Not sending span to honeycomb because it is below the threshold" ) ;
312
317
span. cancel( ) ;
@@ -330,30 +335,30 @@ impl<RT: Runtime> Committer<RT> {
330
335
parent_trace,
331
336
} ) => {
332
337
333
- let root_span_ref = root_span. get_or_insert_with( || {
338
+ let parent_span = initialize_root_from_parent( "handle_commit_message" , parent_trace. clone( ) )
339
+ . with_property( || ( "time_in_queue_ms" , format!( "{}" , queue_timer. elapsed( ) . as_secs_f64( ) * 1000.0 ) ) ) ;
340
+ let committer_span_ref = committer_span. get_or_insert_with( || {
334
341
span_commit_id = Some ( commit_id) ;
335
342
Span :: root( "commit" , SpanContext :: random( ) )
336
343
} ) ;
337
- let _span =
338
- Span :: enter_with_parent( "start_commit" , root_span_ref) ;
339
- let root = initialize_root_from_parent( "handle_commit_message" , parent_trace. clone( ) )
340
- . with_property( || ( "time_in_queue_ms" , format!( "{}" , queue_timer. elapsed( ) . as_secs_f64( ) * 1000.0 ) ) ) ;
341
- let _guard = root. set_local_parent( ) ;
344
+ let start_commit_span =
345
+ Span :: enter_with_parents( "start_commit" , [ committer_span_ref, & parent_span] ) ;
346
+ let _guard = start_commit_span. set_local_parent( ) ;
342
347
drop( queue_timer) ;
343
348
if let Some ( persistence_write_future) = self . start_commit( transaction,
344
349
result,
345
350
write_source,
346
351
parent_trace,
347
352
commit_id,
348
- root_span_ref ) {
353
+ committer_span_ref ) {
349
354
self . persistence_writes. push_back( persistence_write_future) ;
350
355
commit_id += 1 ;
351
356
} else if span_commit_id == Some ( commit_id) {
352
357
// If the span_commit_id is the same as the commit_id, that means we created a root span in this block
353
358
// and it didn't get incremented, so it's not a write to persistence and we should not trace it.
354
- // We also need to reset the span_commit_id and root_span .
355
- root_span_ref . cancel( ) ;
356
- root_span = None ;
359
+ // We also need to reset the span_commit_id and committer_span .
360
+ committer_span_ref . cancel( ) ;
361
+ committer_span = None ;
357
362
span_commit_id = None ;
358
363
}
359
364
} ,
@@ -685,6 +690,7 @@ impl<RT: Runtime> Committer<RT> {
685
690
} )
686
691
}
687
692
693
+ #[ fastrace:: trace]
688
694
fn compute_writes (
689
695
& self ,
690
696
commit_ts : Timestamp ,
@@ -730,6 +736,7 @@ impl<RT: Runtime> Committer<RT> {
730
736
Ok ( ( document_writes, index_writes, latest_pending_snapshot) )
731
737
}
732
738
739
+ #[ fastrace:: trace]
733
740
fn commit_has_conflict (
734
741
& self ,
735
742
reads : & ReadSet ,
@@ -775,6 +782,7 @@ impl<RT: Runtime> Committer<RT> {
775
782
776
783
/// After writing the new rows to persistence, mark the commit as complete
777
784
/// and allow the updated rows to be read by other transactions.
785
+ #[ fastrace:: trace]
778
786
fn publish_commit ( & mut self , pending_write : PendingWriteHandle ) {
779
787
let apply_timer = metrics:: commit_apply_timer ( ) ;
780
788
let commit_ts = pending_write. must_commit_ts ( ) ;
@@ -886,6 +894,7 @@ impl<RT: Runtime> Committer<RT> {
886
894
} )
887
895
}
888
896
. in_span ( outer_span)
897
+ . in_span ( request_span)
889
898
. boxed ( ) ,
890
899
)
891
900
}
0 commit comments