@@ -15,17 +15,18 @@ use std::env;
1515use std:: sync:: Arc ;
1616
1717use anyhow:: Result ;
18+ use log:: info;
1819use pyo3:: exceptions:: { PyRuntimeError , PyTimeoutError } ;
1920use structopt:: StructOpt ;
2021use tokio:: runtime:: Runtime ;
2122use tokio:: task:: JoinHandle ;
22- use tonic:: transport:: Channel ;
2323use tonic:: Status ;
2424
2525pub mod torchftpb {
2626 tonic:: include_proto!( "torchft" ) ;
2727}
2828
29+ use crate :: net:: Channel ;
2930use crate :: torchftpb:: manager_service_client:: ManagerServiceClient ;
3031use crate :: torchftpb:: { CheckpointAddressRequest , ManagerQuorumRequest , ShouldCommitRequest } ;
3132use pyo3:: prelude:: * ;
@@ -301,8 +302,7 @@ impl From<Status> for StatusError {
301302 }
302303}
303304
304- #[ pymodule]
305- fn torchft ( m : & Bound < ' _ , PyModule > ) -> PyResult < ( ) > {
305+ fn init_logging ( ) -> PyResult < ( ) > {
306306 // setup logging on import
307307 let mut log = stderrlog:: new ( ) ;
308308 log. verbosity ( 2 )
@@ -316,6 +316,92 @@ fn torchft(m: &Bound<'_, PyModule>) -> PyResult<()> {
316316 log. init ( )
317317 . map_err ( |e| PyRuntimeError :: new_err ( e. to_string ( ) ) ) ?;
318318
319+ Ok ( ( ) )
320+ }
321+
322+ fn init_tracing ( ) -> PyResult < ( ) > {
323+ use opentelemetry:: trace:: Tracer ;
324+ use opentelemetry:: trace:: TracerProvider as OpenTelemetryTracerProvider ;
325+ use opentelemetry_otlp:: WithExportConfig ;
326+ use opentelemetry_sdk:: trace:: TracerProvider ;
327+ use tracing_subscriber:: layer:: SubscriberExt ;
328+ use tracing_subscriber:: { filter:: EnvFilter , Layer } ;
329+
330+ fn set_tracer_provider ( tracer_provider : TracerProvider ) -> PyResult < ( ) > {
331+ opentelemetry:: global:: set_tracer_provider ( tracer_provider. clone ( ) ) ;
332+
333+ let layer = tracing_opentelemetry:: layer ( )
334+ . with_error_records_to_exceptions ( true )
335+ . with_tracer ( tracer_provider. tracer ( "" ) ) ;
336+
337+ // Create a new tracing::Fmt layer to print the logs to stdout. It has a
338+ // default filter of `info` level and above, and `debug` and above for logs
339+ // from OpenTelemetry crates. The filter levels can be customized as needed.
340+ let filter_fmt =
341+ EnvFilter :: new ( "info" ) . add_directive ( "opentelemetry=debug" . parse ( ) . unwrap ( ) ) ;
342+ let fmt_layer = tracing_subscriber:: fmt:: layer ( )
343+ . with_thread_names ( true )
344+ . with_filter ( filter_fmt) ;
345+
346+ let subscriber = tracing_subscriber:: registry ( ) . with ( fmt_layer) . with ( layer) ;
347+ tracing:: subscriber:: set_global_default ( subscriber)
348+ . map_err ( |e| PyRuntimeError :: new_err ( e. to_string ( ) ) ) ?;
349+
350+ info ! ( "OpenTelemetry tracing enabled" ) ;
351+
352+ Ok ( ( ) )
353+ }
354+
355+ match env:: var ( "TORCHFT_OTEL_OTLP" ) {
356+ Ok ( endpoint) => {
357+ let runtime = Runtime :: new ( ) ?;
358+
359+ runtime. block_on ( async move {
360+ info ! ( "Enabling OpenTelemetry OTLP with {}" , endpoint) ;
361+ let exporter = opentelemetry_otlp:: SpanExporter :: builder ( )
362+ . with_tonic ( )
363+ . with_endpoint ( endpoint)
364+ . with_timeout ( Duration :: from_secs ( 10 ) )
365+ . build ( )
366+ . map_err ( |e| PyRuntimeError :: new_err ( e. to_string ( ) ) ) ?;
367+
368+ let tracer_provider = TracerProvider :: builder ( )
369+ . with_batch_exporter ( exporter, opentelemetry_sdk:: runtime:: Tokio )
370+ . build ( ) ;
371+
372+ set_tracer_provider ( tracer_provider) ?;
373+
374+ Ok :: < ( ) , pyo3:: PyErr > ( ( ) )
375+ } ) ?;
376+ }
377+ Err ( _) => { }
378+ } ;
379+ match env:: var ( "TORCHFT_OTEL_STDOUT" ) {
380+ Ok ( _) => {
381+ info ! ( "Enabling OpenTelemetry stdout" ) ;
382+ let exporter = opentelemetry_stdout:: SpanExporter :: default ( ) ;
383+ let tracer_provider = TracerProvider :: builder ( )
384+ . with_simple_exporter ( exporter)
385+ . build ( ) ;
386+
387+ set_tracer_provider ( tracer_provider) ?;
388+ }
389+ Err ( _) => { }
390+ }
391+
392+ let tracer = opentelemetry:: global:: tracer ( "my_tracer" ) ;
393+ tracer. in_span ( "doing_work" , |cx| {
394+ // Traced app logic here...
395+ } ) ;
396+
397+ Ok ( ( ) )
398+ }
399+
400+ #[ pymodule]
401+ fn torchft ( m : & Bound < ' _ , PyModule > ) -> PyResult < ( ) > {
402+ init_logging ( ) ?;
403+ init_tracing ( ) ?;
404+
319405 m. add_class :: < Manager > ( ) ?;
320406 m. add_class :: < ManagerClient > ( ) ?;
321407 m. add_class :: < Lighthouse > ( ) ?;
0 commit comments