Skip to content

Commit

Permalink
chore: add logs (apache#614)
Browse files Browse the repository at this point in the history
* chore: add logs

* fix reviews

* panic when listen failed
  • Loading branch information
jiacai2050 authored Feb 8, 2023
1 parent 863cf7c commit 7fdaa4e
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 32 deletions.
6 changes: 5 additions & 1 deletion components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl MessageQueue for KafkaImpl {

// Create topic in Kafka.
let topic_management_config = &self.config.topic_management_config;
info!("Try to create topic:{} in kafka", topic_name);
info!("Try to create topic, name:{}.", topic_name);
let result = self
.controller_client
.create_topic(
Expand All @@ -211,6 +211,10 @@ impl MessageQueue for KafkaImpl {
)
.await;

info!(
"Create topic finish, name:{}, result:{:?}",
topic_name, result
);
match result {
// Race condition between check and creation action, that's OK.
Ok(_)
Expand Down
11 changes: 6 additions & 5 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
let serve_addr = self.serve_addr;
let (stop_tx, stop_rx) = oneshot::channel();
let join_handle = self.runtime.spawn(async move {
info!("Grpc server starts listening on {}", serve_addr);
info!("Grpc server tries to listen on {}", serve_addr);

let mut router = Server::builder().add_service(rpc_server);

Expand All @@ -171,11 +171,12 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
info!("Grpc server serves remote engine rpc service");
router = router.add_service(remote_engine_server);

let serve_res = router
router
.serve_with_shutdown(serve_addr, stop_rx.map(drop))
.await;

warn!("Grpc server stops serving, exit result:{:?}", serve_res);
.await
.unwrap_or_else(|e| {
panic!("Grpc server listens failed, err:{:?}", e);
});
});
self.join_handle = Some(join_handle);
self.stop_tx = Some(stop_tx);
Expand Down
7 changes: 6 additions & 1 deletion server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
time::Duration,
};

use log::error;
use log::{error, info};
use logger::RuntimeLevel;
use profile::Profiler;
use prom_remote_api::{types::RemoteStorageRef, web};
Expand Down Expand Up @@ -442,6 +442,11 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
config: self.config.clone(),
};

info!(
"HTTP server tries to listen on {}",
&self.config.endpoint.to_string()
);

let ip_addr: IpAddr = self.config.endpoint.addr.parse().context(ParseIpAddr {
ip: self.config.endpoint.addr,
})?;
Expand Down
10 changes: 0 additions & 10 deletions server/src/mysql/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Mysql Server not running, err: {}.\nBacktrace:\n{}",
source,
backtrace
))]
ServerNotRunning {
backtrace: Backtrace,
source: std::io::Error,
},

#[snafu(display("Failed to create request context, err:{}", source))]
CreateContext { source: crate::context::Error },

Expand Down
23 changes: 8 additions & 15 deletions server/src/mysql/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ use common_util::runtime::JoinHandle;
use log::{error, info};
use opensrv_mysql::AsyncMysqlIntermediary;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;
use table_engine::engine::EngineRuntimes;
use tokio::sync::oneshot::{self, Receiver, Sender};

use crate::{
instance::{Instance, InstanceRef},
mysql::{
error::{Result, ServerNotRunning},
worker::MysqlWorker,
},
mysql::{error::Result, worker::MysqlWorker},
};

pub struct MysqlService<Q> {
Expand Down Expand Up @@ -51,14 +47,16 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {

let rt = self.runtimes.clone();
self.tx = Some(tx);

info!("MySQL server tries to listen on {}", self.socket_addr);

self.join_handler = Some(rt.bg_runtime.spawn(Self::loop_accept(
self.instance.clone(),
self.runtimes.clone(),
self.socket_addr,
self.timeout,
rx,
)));
info!("Mysql service listens on {}", self.socket_addr);
Ok(())
}

Expand All @@ -75,16 +73,11 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
timeout: Option<Duration>,
mut rx: Receiver<()>,
) {
let listener = match tokio::net::TcpListener::bind(socket_addr)
let listener = tokio::net::TcpListener::bind(socket_addr)
.await
.context(ServerNotRunning)
{
Ok(l) => l,
Err(err) => {
error!("Mysql server binds failed, err:{}", err);
return;
}
};
.unwrap_or_else(|e| {
panic!("Mysql server listens failed, err:{}", e);
});
loop {
tokio::select! {
conn_result = listener.accept() => {
Expand Down

0 comments on commit 7fdaa4e

Please sign in to comment.