11#![ doc = include_str ! ( "../README.md" ) ]
22use console_api as proto;
3- use proto:: resources:: resource;
3+ use proto:: { instrument :: instrument_server :: InstrumentServer , resources:: resource} ;
44use serde:: Serialize ;
55use std:: {
66 cell:: RefCell ,
@@ -15,7 +15,10 @@ use std::{
1515use thread_local:: ThreadLocal ;
1616#[ cfg( unix) ]
1717use tokio:: net:: UnixListener ;
18- use tokio:: sync:: { mpsc, oneshot} ;
18+ use tokio:: {
19+ sync:: { mpsc, oneshot} ,
20+ task:: JoinHandle ,
21+ } ;
1922#[ cfg( unix) ]
2023use tokio_stream:: wrappers:: UnixListenerStream ;
2124use tracing_core:: {
@@ -39,7 +42,7 @@ mod stats;
3942pub ( crate ) mod sync;
4043mod visitors;
4144
42- use aggregator:: Aggregator ;
45+ pub use aggregator:: Aggregator ;
4346pub use builder:: { Builder , ServerAddr } ;
4447use callsites:: Callsites ;
4548use record:: Recorder ;
@@ -933,18 +936,16 @@ impl Server {
933936 ///
934937 /// [`tonic`]: https://docs.rs/tonic/
935938 pub async fn serve_with (
936- mut self ,
939+ self ,
937940 mut builder : tonic:: transport:: Server ,
938941 ) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync + ' static > > {
939- let aggregate = self
940- . aggregator
941- . take ( )
942- . expect ( "cannot start server multiple times" ) ;
943- let aggregate = spawn_named ( aggregate. run ( ) , "console::aggregate" ) ;
944942 let addr = self . addr . clone ( ) ;
945- let router = builder. add_service (
946- proto:: instrument:: instrument_server:: InstrumentServer :: new ( self ) ,
947- ) ;
943+ let ServerParts {
944+ instrument_server,
945+ aggregator,
946+ } = self . into_parts ( ) ;
947+ let aggregate = spawn_named ( aggregator. run ( ) , "console::aggregate" ) ;
948+ let router = builder. add_service ( instrument_server) ;
948949 let res = match addr {
949950 ServerAddr :: Tcp ( addr) => {
950951 let serve = router. serve ( addr) ;
@@ -960,6 +961,135 @@ impl Server {
960961 aggregate. abort ( ) ;
961962 res?. map_err ( Into :: into)
962963 }
964+
965+ /// Returns the parts needed to spawn a gRPC server and the aggregator that
966+ /// supplies it.
967+ ///
968+ /// Note that a server spawned in this way will disregard any value set by
969+ /// [`Builder::server_addr`], as the user becomes responsible for defining
970+ /// the address when calling [`Router::serve`].
971+ ///
972+ /// Additionally, the user of this API must ensure that the [`Aggregator`]
973+ /// is running for as long as the gRPC server is. If the server stops
974+ /// running, the aggregator task can be aborted.
975+ ///
976+ /// # Examples
977+ ///
978+ /// The parts can be used to serve the instrument server together with
979+ /// other endpoints from the same gRPC server.
980+ ///
981+ /// ```
982+ /// use console_subscriber::{ConsoleLayer, ServerParts};
983+ ///
984+ /// # let runtime = tokio::runtime::Builder::new_current_thread()
985+ /// # .enable_all()
986+ /// # .build()
987+ /// # .unwrap();
988+ /// # runtime.block_on(async {
989+ /// let (console_layer, server) = ConsoleLayer::builder().build();
990+ /// let ServerParts {
991+ /// instrument_server,
992+ /// aggregator,
993+ /// ..
994+ /// } = server.into_parts();
995+ ///
996+ /// let aggregator_handle = tokio::spawn(aggregator.run());
997+ /// let router = tonic::transport::Server::builder()
998+ /// //.add_service(some_other_service)
999+ /// .add_service(instrument_server);
1000+ /// let serve = router.serve(std::net::SocketAddr::new(
1001+ /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
1002+ /// 6669,
1003+ /// ));
1004+ ///
1005+ /// // Finally, spawn the server.
1006+ /// tokio::spawn(serve);
1007+ /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused.
1008+ /// # drop(console_layer);
1009+ /// # let mut aggregator_handle = aggregator_handle;
1010+ /// # aggregator_handle.abort();
1011+ /// # });
1012+ /// ```
1013+ ///
1014+ /// [`Router::serve`]: fn@tonic::transport::server::Router::serve
1015+ pub fn into_parts ( mut self ) -> ServerParts {
1016+ let aggregator = self
1017+ . aggregator
1018+ . take ( )
1019+ . expect ( "cannot start server multiple times" ) ;
1020+
1021+ let instrument_server = proto:: instrument:: instrument_server:: InstrumentServer :: new ( self ) ;
1022+
1023+ ServerParts {
1024+ instrument_server,
1025+ aggregator,
1026+ }
1027+ }
1028+ }
1029+
1030+ /// Server Parts
1031+ ///
1032+ /// This struct contains the parts returned by [`Server::into_parts`]. It may contain
1033+ /// further parts in the future, an as such is marked as `non_exhaustive`.
1034+ ///
1035+ /// The `InstrumentServer<Server>` can be used to construct a router which
1036+ /// can be added to a [`tonic`] gRPC server.
1037+ ///
1038+ /// The `aggregator` is a future which should be running as long as the server is.
1039+ /// Generally, this future should be spawned onto an appropriate runtime and then
1040+ /// aborted if the server gets shut down.
1041+ ///
1042+ /// See the [`Server::into_parts`] documentation for usage.
1043+ #[ non_exhaustive]
1044+ pub struct ServerParts {
1045+ /// The instrument server.
1046+ ///
1047+ /// See the documentation for [`InstrumentServer`] for details.
1048+ pub instrument_server : InstrumentServer < Server > ,
1049+
1050+ /// The aggregator.
1051+ ///
1052+ /// Responsible for collecting and preparing traces for the instrument server
1053+ /// to send its clients.
1054+ ///
1055+ /// The aggregator should be [`run`] when the instrument server is started.
1056+ /// If the server stops running for any reason, the aggregator task can be
1057+ /// aborted.
1058+ ///
1059+ /// [`run`]: fn@crate::Aggregator::run
1060+ pub aggregator : Aggregator ,
1061+ }
1062+
1063+ /// Aggregator handle.
1064+ ///
1065+ /// This object is returned from [`Server::into_parts`]. It can be
1066+ /// used to abort the aggregator task.
1067+ ///
1068+ /// The aggregator collects the traces that implement the async runtime
1069+ /// being observed and prepares them to be served by the gRPC server.
1070+ ///
1071+ /// Normally, if the server, started with [`Server::serve`] or
1072+ /// [`Server::serve_with`] stops for any reason, the aggregator is aborted,
1073+ /// hoewver, if the server was started with the [`InstrumentServer`] returned
1074+ /// from [`Server::into_parts`], then it is the responsibility of the user
1075+ /// of the API to stop the aggregator task by calling [`abort`] on this
1076+ /// object.
1077+ ///
1078+ /// [`abort`]: fn@crate::AggregatorHandle::abort
1079+ pub struct AggregatorHandle {
1080+ join_handle : JoinHandle < ( ) > ,
1081+ }
1082+
1083+ impl AggregatorHandle {
1084+ /// Aborts the task running this aggregator.
1085+ ///
1086+ /// To avoid having a disconnected aggregator running forever, this
1087+ /// method should be called when the [`tonic::transport::Server`] started
1088+ /// with the [`InstrumentServer`] also returned from [`Server::into_parts`]
1089+ /// stops running.
1090+ pub fn abort ( & mut self ) {
1091+ self . join_handle . abort ( ) ;
1092+ }
9631093}
9641094
9651095#[ tonic:: async_trait]
0 commit comments