diff --git a/components/message_queue/src/kafka/kafka_impl.rs b/components/message_queue/src/kafka/kafka_impl.rs index e431a9a975..7ecaacd991 100644 --- a/components/message_queue/src/kafka/kafka_impl.rs +++ b/components/message_queue/src/kafka/kafka_impl.rs @@ -200,7 +200,7 @@ impl MessageQueue for KafkaImpl { // Create topic in Kafka. let topic_management_config = &self.config.topic_management_config; - info!("Try to create topic:{} in kafka", topic_name); + info!("Try to create topic, name:{}.", topic_name); let result = self .controller_client .create_topic( @@ -211,6 +211,10 @@ impl MessageQueue for KafkaImpl { ) .await; + info!( + "Create topic finish, name:{}, result:{:?}", + topic_name, result + ); match result { // Race condition between check and creation action, that's OK. Ok(_) diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 78c0254618..710cf9f355 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -159,7 +159,7 @@ impl RpcServices { let serve_addr = self.serve_addr; let (stop_tx, stop_rx) = oneshot::channel(); let join_handle = self.runtime.spawn(async move { - info!("Grpc server starts listening on {}", serve_addr); + info!("Grpc server tries to listen on {}", serve_addr); let mut router = Server::builder().add_service(rpc_server); @@ -171,11 +171,12 @@ impl RpcServices { info!("Grpc server serves remote engine rpc service"); router = router.add_service(remote_engine_server); - let serve_res = router + router .serve_with_shutdown(serve_addr, stop_rx.map(drop)) - .await; - - warn!("Grpc server stops serving, exit result:{:?}", serve_res); + .await + .unwrap_or_else(|e| { + panic!("Grpc server listens failed, err:{:?}", e); + }); }); self.join_handle = Some(join_handle); self.stop_tx = Some(stop_tx); diff --git a/server/src/http.rs b/server/src/http.rs index 5ea7edd994..0f2dcc737d 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -7,7 +7,7 @@ use std::{ time::Duration, }; -use log::error; +use log::{error, info}; use logger::RuntimeLevel; use profile::Profiler; use prom_remote_api::{types::RemoteStorageRef, web}; @@ -442,6 +442,11 @@ impl Builder { config: self.config.clone(), }; + info!( + "HTTP server tries to listen on {}", + &self.config.endpoint.to_string() + ); + let ip_addr: IpAddr = self.config.endpoint.addr.parse().context(ParseIpAddr { ip: self.config.endpoint.addr, })?; diff --git a/server/src/mysql/error.rs b/server/src/mysql/error.rs index 87a820b9cf..11efd60389 100644 --- a/server/src/mysql/error.rs +++ b/server/src/mysql/error.rs @@ -24,16 +24,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Mysql Server not running, err: {}.\nBacktrace:\n{}", - source, - backtrace - ))] - ServerNotRunning { - backtrace: Backtrace, - source: std::io::Error, - }, - #[snafu(display("Failed to create request context, err:{}", source))] CreateContext { source: crate::context::Error }, diff --git a/server/src/mysql/service.rs b/server/src/mysql/service.rs index 0b820f50ae..14c9769925 100644 --- a/server/src/mysql/service.rs +++ b/server/src/mysql/service.rs @@ -6,16 +6,12 @@ use common_util::runtime::JoinHandle; use log::{error, info}; use opensrv_mysql::AsyncMysqlIntermediary; use query_engine::executor::Executor as QueryExecutor; -use snafu::ResultExt; use table_engine::engine::EngineRuntimes; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::{ instance::{Instance, InstanceRef}, - mysql::{ - error::{Result, ServerNotRunning}, - worker::MysqlWorker, - }, + mysql::{error::Result, worker::MysqlWorker}, }; pub struct MysqlService { @@ -51,6 +47,9 @@ impl MysqlService { let rt = self.runtimes.clone(); self.tx = Some(tx); + + info!("MySQL server tries to listen on {}", self.socket_addr); + self.join_handler = Some(rt.bg_runtime.spawn(Self::loop_accept( self.instance.clone(), self.runtimes.clone(), @@ -58,7 +57,6 @@ impl MysqlService { self.timeout, rx, ))); - info!("Mysql service listens on {}", self.socket_addr); Ok(()) } @@ -75,16 +73,11 @@ impl MysqlService { timeout: Option, mut rx: Receiver<()>, ) { - let listener = match tokio::net::TcpListener::bind(socket_addr) + let listener = tokio::net::TcpListener::bind(socket_addr) .await - .context(ServerNotRunning) - { - Ok(l) => l, - Err(err) => { - error!("Mysql server binds failed, err:{}", err); - return; - } - }; + .unwrap_or_else(|e| { + panic!("Mysql server listens failed, err:{}", e); + }); loop { tokio::select! { conn_result = listener.accept() => {