Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add logs #614

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion components/message_queue/src/kafka/kafka_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl MessageQueue for KafkaImpl {

// Create topic in Kafka.
let topic_management_config = &self.config.topic_management_config;
info!("Try to create topic:{} in kafka", topic_name);
info!("Try to create topic, name:{}.", topic_name);
let result = self
.controller_client
.create_topic(
Expand All @@ -211,6 +211,10 @@ impl MessageQueue for KafkaImpl {
)
.await;

info!(
"Create topic finish, name:{}, result:{:?}",
topic_name, result
);
match result {
// Race condition between check and creation action, that's OK.
Ok(_)
Expand Down
11 changes: 6 additions & 5 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
let serve_addr = self.serve_addr;
let (stop_tx, stop_rx) = oneshot::channel();
let join_handle = self.runtime.spawn(async move {
info!("Grpc server starts listening on {}", serve_addr);
info!("Grpc server tries to listen on {}", serve_addr);

let mut router = Server::builder().add_service(rpc_server);

Expand All @@ -177,11 +177,12 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
info!("Grpc server serves remote engine rpc service");
router = router.add_service(remote_engine_server);

let serve_res = router
router
.serve_with_shutdown(serve_addr, stop_rx.map(drop))
.await;

warn!("Grpc server stops serving, exit result:{:?}", serve_res);
.await
.unwrap_or_else(|e| {
panic!("Grpc server listens failed, err:{:?}", e);
});
});
self.join_handle = Some(join_handle);
self.stop_tx = Some(stop_tx);
Expand Down
7 changes: 6 additions & 1 deletion server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
time::Duration,
};

use log::error;
use log::{error, info};
use logger::RuntimeLevel;
use profile::Profiler;
use prom_remote_api::{types::RemoteStorageRef, web};
Expand Down Expand Up @@ -459,6 +459,11 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
enable_tenant_as_schema: self.enable_tenant_as_schema,
};

info!(
"HTTP server tries to listen on {}",
&self.config.endpoint.to_string()
);

let ip_addr: IpAddr = self.config.endpoint.addr.parse().context(ParseIpAddr {
ip: self.config.endpoint.addr,
})?;
Expand Down
10 changes: 0 additions & 10 deletions server/src/mysql/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Mysql Server not running, err: {}.\nBacktrace:\n{}",
source,
backtrace
))]
ServerNotRunning {
backtrace: Backtrace,
source: std::io::Error,
},

#[snafu(display("Failed to create request context, err:{}", source))]
CreateContext { source: crate::context::Error },

Expand Down
23 changes: 8 additions & 15 deletions server/src/mysql/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ use common_util::runtime::JoinHandle;
use log::{error, info};
use opensrv_mysql::AsyncMysqlIntermediary;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;
use table_engine::engine::EngineRuntimes;
use tokio::sync::oneshot::{self, Receiver, Sender};

use crate::{
instance::{Instance, InstanceRef},
mysql::{
error::{Result, ServerNotRunning},
worker::MysqlWorker,
},
mysql::{error::Result, worker::MysqlWorker},
};

pub struct MysqlService<Q> {
Expand Down Expand Up @@ -51,14 +47,16 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {

let rt = self.runtimes.clone();
self.tx = Some(tx);

info!("MySQL server tries to listen on {}", self.socket_addr);

self.join_handler = Some(rt.bg_runtime.spawn(Self::loop_accept(
self.instance.clone(),
self.runtimes.clone(),
self.socket_addr,
self.timeout,
rx,
)));
info!("Mysql service listens on {}", self.socket_addr);
Ok(())
}

Expand All @@ -75,16 +73,11 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
timeout: Option<Duration>,
mut rx: Receiver<()>,
) {
let listener = match tokio::net::TcpListener::bind(socket_addr)
let listener = tokio::net::TcpListener::bind(socket_addr)
.await
.context(ServerNotRunning)
{
Ok(l) => l,
Err(err) => {
error!("Mysql server binds failed, err:{}", err);
return;
}
};
.unwrap_or_else(|e| {
panic!("Mysql server listens failed, err:{}", e);
});
loop {
tokio::select! {
conn_result = listener.accept() => {
Expand Down