Skip to content

Commit 8cd69f4

Browse files
feat: REPL issues logical plan to DB (#1097)
1 parent f52fc9b commit 8cd69f4

File tree

14 files changed

+147
-35
lines changed

14 files changed

+147
-35
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/client/src/database.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,18 @@ impl Database {
5555
}
5656
}
5757

58+
pub fn catalog(&self) -> &String {
59+
&self.catalog
60+
}
61+
5862
pub fn set_catalog(&mut self, catalog: impl Into<String>) {
5963
self.catalog = catalog.into();
6064
}
6165

66+
pub fn schema(&self) -> &String {
67+
&self.schema
68+
}
69+
6270
pub fn set_schema(&mut self, schema: impl Into<String>) {
6371
self.schema = schema.into();
6472
}

src/cmd/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ path = "src/bin/greptime.rs"
1111

1212
[dependencies]
1313
anymap = "1.0.0-beta.2"
14+
catalog = { path = "../catalog" }
1415
clap = { version = "3.1", features = ["derive"] }
1516
client = { path = "../client" }
1617
common-base = { path = "../common/base" }
1718
common-error = { path = "../common/error" }
1819
common-query = { path = "../common/query" }
1920
common-recordbatch = { path = "../common/recordbatch" }
21+
substrait = { path = "../common/substrait" }
2022
common-telemetry = { path = "../common/telemetry", features = [
2123
"deadlock_detection",
2224
] }
@@ -27,9 +29,12 @@ futures.workspace = true
2729
meta-client = { path = "../meta-client" }
2830
meta-srv = { path = "../meta-srv" }
2931
nu-ansi-term = "0.46"
32+
partition = { path = "../partition" }
33+
query = { path = "../query" }
3034
rustyline = "10.1"
3135
serde.workspace = true
3236
servers = { path = "../servers" }
37+
session = { path = "../session" }
3338
snafu.workspace = true
3439
tokio.workspace = true
3540
toml = "0.5"

src/cmd/src/cli.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,15 @@ impl SubCommand {
5050
pub(crate) struct AttachCommand {
5151
#[clap(long)]
5252
pub(crate) grpc_addr: String,
53+
#[clap(long)]
54+
pub(crate) meta_addr: Option<String>,
5355
#[clap(long, action)]
5456
pub(crate) disable_helper: bool,
5557
}
5658

5759
impl AttachCommand {
5860
async fn run(self) -> Result<()> {
59-
let mut repl = Repl::try_new(&self)?;
61+
let mut repl = Repl::try_new(&self).await?;
6062
repl.run().await
6163
}
6264
}

src/cmd/src/cli/repl.rs

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,39 @@
1313
// limitations under the License.
1414

1515
use std::path::PathBuf;
16+
use std::sync::Arc;
1617
use std::time::Instant;
1718

19+
use catalog::remote::MetaKvBackend;
1820
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1921
use common_error::prelude::ErrorExt;
2022
use common_query::Output;
2123
use common_recordbatch::RecordBatches;
2224
use common_telemetry::logging;
2325
use either::Either;
26+
use frontend::catalog::FrontendCatalogManager;
27+
use frontend::datanode::DatanodeClients;
28+
use meta_client::client::MetaClientBuilder;
29+
use partition::manager::PartitionRuleManager;
30+
use partition::route::TableRoutes;
31+
use query::datafusion::DatafusionQueryEngine;
32+
use query::logical_optimizer::LogicalOptimizer;
33+
use query::parser::QueryLanguageParser;
34+
use query::plan::LogicalPlan;
35+
use query::QueryEngine;
2436
use rustyline::error::ReadlineError;
2537
use rustyline::Editor;
38+
use session::context::QueryContext;
2639
use snafu::{ErrorCompat, ResultExt};
40+
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
2741

2842
use crate::cli::cmd::ReplCommand;
2943
use crate::cli::helper::RustylineHelper;
3044
use crate::cli::AttachCommand;
3145
use crate::error::{
32-
CollectRecordBatchesSnafu, PrettyPrintRecordBatchesSnafu, ReadlineSnafu, ReplCreationSnafu,
33-
RequestDatabaseSnafu, Result,
46+
CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu,
47+
ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu,
48+
SubstraitEncodeLogicalPlanSnafu,
3449
};
3550

3651
/// Captures the state of the repl, gathers commands and executes them one by one
@@ -43,6 +58,8 @@ pub(crate) struct Repl {
4358

4459
/// Client for interacting with GreptimeDB
4560
database: Database,
61+
62+
query_engine: Option<DatafusionQueryEngine>,
4663
}
4764

4865
#[allow(clippy::print_stdout)]
@@ -51,7 +68,7 @@ impl Repl {
5168
println!("{}", ReplCommand::help())
5269
}
5370

54-
pub(crate) fn try_new(cmd: &AttachCommand) -> Result<Self> {
71+
pub(crate) async fn try_new(cmd: &AttachCommand) -> Result<Self> {
5572
let mut rl = Editor::new().context(ReplCreationSnafu)?;
5673

5774
if !cmd.disable_helper {
@@ -69,10 +86,17 @@ impl Repl {
6986
let client = Client::with_urls([&cmd.grpc_addr]);
7087
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
7188

89+
let query_engine = if let Some(meta_addr) = &cmd.meta_addr {
90+
create_query_engine(meta_addr).await.map(Some)?
91+
} else {
92+
None
93+
};
94+
7295
Ok(Self {
7396
rl,
7497
prompt: "> ".to_string(),
7598
database,
99+
query_engine,
76100
})
77101
}
78102

@@ -134,11 +158,29 @@ impl Repl {
134158
async fn do_execute_sql(&self, sql: String) -> Result<()> {
135159
let start = Instant::now();
136160

137-
let output = self
138-
.database
139-
.sql(&sql)
140-
.await
141-
.context(RequestDatabaseSnafu { sql: &sql })?;
161+
let output = if let Some(query_engine) = &self.query_engine {
162+
let stmt = QueryLanguageParser::parse_sql(&sql)
163+
.with_context(|_| ParseSqlSnafu { sql: sql.clone() })?;
164+
165+
let query_ctx = Arc::new(QueryContext::with(
166+
self.database.catalog(),
167+
self.database.schema(),
168+
));
169+
let LogicalPlan::DfPlan(plan) = query_engine
170+
.statement_to_plan(stmt, query_ctx)
171+
.await
172+
.and_then(|x| query_engine.optimize(&x))
173+
.context(PlanStatementSnafu)?;
174+
175+
let plan = DFLogicalSubstraitConvertor {}
176+
.encode(plan)
177+
.context(SubstraitEncodeLogicalPlanSnafu)?;
178+
179+
self.database.logical_plan(plan.to_vec()).await
180+
} else {
181+
self.database.sql(&sql).await
182+
}
183+
.context(RequestDatabaseSnafu { sql: &sql })?;
142184

143185
let either = match output {
144186
Output::Stream(s) => {
@@ -197,3 +239,29 @@ fn history_file() -> PathBuf {
197239
buf.push(".greptimedb_cli_history");
198240
buf
199241
}
242+
243+
async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
244+
let mut meta_client = MetaClientBuilder::default().enable_store().build();
245+
meta_client
246+
.start([meta_addr])
247+
.await
248+
.context(StartMetaClientSnafu)?;
249+
let meta_client = Arc::new(meta_client);
250+
251+
let backend = Arc::new(MetaKvBackend {
252+
client: meta_client.clone(),
253+
});
254+
255+
let table_routes = Arc::new(TableRoutes::new(meta_client));
256+
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
257+
258+
let datanode_clients = Arc::new(DatanodeClients::default());
259+
260+
let catalog_list = Arc::new(FrontendCatalogManager::new(
261+
backend,
262+
partition_manager,
263+
datanode_clients,
264+
));
265+
266+
Ok(DatafusionQueryEngine::new(catalog_list, Default::default()))
267+
}

src/cmd/src/error.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,31 @@ pub enum Error {
103103
#[snafu(backtrace)]
104104
source: common_recordbatch::error::Error,
105105
},
106+
107+
#[snafu(display("Failed to start Meta client, source: {}", source))]
108+
StartMetaClient {
109+
#[snafu(backtrace)]
110+
source: meta_client::error::Error,
111+
},
112+
113+
#[snafu(display("Failed to parse SQL: {}, source: {}", sql, source))]
114+
ParseSql {
115+
sql: String,
116+
#[snafu(backtrace)]
117+
source: query::error::Error,
118+
},
119+
120+
#[snafu(display("Failed to plan statement, source: {}", source))]
121+
PlanStatement {
122+
#[snafu(backtrace)]
123+
source: query::error::Error,
124+
},
125+
126+
#[snafu(display("Failed to encode logical plan in substrait, source: {}", source))]
127+
SubstraitEncodeLogicalPlan {
128+
#[snafu(backtrace)]
129+
source: substrait::error::Error,
130+
},
106131
}
107132

108133
pub type Result<T> = std::result::Result<T, Error>;
@@ -126,6 +151,11 @@ impl ErrorExt for Error {
126151
Error::CollectRecordBatches { source } | Error::PrettyPrintRecordBatches { source } => {
127152
source.status_code()
128153
}
154+
Error::StartMetaClient { source } => source.status_code(),
155+
Error::ParseSql { source, .. } | Error::PlanStatement { source } => {
156+
source.status_code()
157+
}
158+
Error::SubstraitEncodeLogicalPlan { source } => source.status_code(),
129159
}
130160
}
131161

src/frontend/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct FrontendCatalogManager {
4646
}
4747

4848
impl FrontendCatalogManager {
49-
pub(crate) fn new(
49+
pub fn new(
5050
backend: KvBackendRef,
5151
partition_manager: PartitionRuleManagerRef,
5252
datanode_clients: Arc<DatanodeClients>,

src/frontend/src/datanode.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ use common_grpc::channel_manager::ChannelManager;
1919
use meta_client::rpc::Peer;
2020
use moka::future::{Cache, CacheBuilder};
2121

22-
pub(crate) struct DatanodeClients {
22+
pub struct DatanodeClients {
2323
channel_manager: ChannelManager,
2424
clients: Cache<Peer, Client>,
2525
}
2626

27-
impl DatanodeClients {
28-
pub(crate) fn new() -> Self {
27+
impl Default for DatanodeClients {
28+
fn default() -> Self {
2929
Self {
3030
channel_manager: ChannelManager::new(),
3131
clients: CacheBuilder::new(1024)
@@ -34,7 +34,9 @@ impl DatanodeClients {
3434
.build(),
3535
}
3636
}
37+
}
3738

39+
impl DatanodeClients {
3840
pub(crate) async fn get_client(&self, datanode: &Peer) -> Client {
3941
self.clients
4042
.get_with_by_ref(datanode, async move {

src/frontend/src/instance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl Instance {
119119
});
120120
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
121121
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
122-
let datanode_clients = Arc::new(DatanodeClients::new());
122+
let datanode_clients = Arc::new(DatanodeClients::default());
123123

124124
let catalog_manager = Arc::new(FrontendCatalogManager::new(
125125
meta_backend,

src/frontend/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
#![feature(assert_matches)]
1616

17-
mod catalog;
18-
mod datanode;
17+
pub mod catalog;
18+
pub mod datanode;
1919
pub mod error;
2020
mod expr_factory;
2121
pub mod frontend;

src/frontend/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu
210210
let kv_store: KvStoreRef = Arc::new(MemStore::default()) as _;
211211
let meta_srv = meta_srv::mocks::mock(MetaSrvOptions::default(), kv_store.clone(), None).await;
212212

213-
let datanode_clients = Arc::new(DatanodeClients::new());
213+
let datanode_clients = Arc::new(DatanodeClients::default());
214214

215215
let mut test_guards = vec![];
216216

src/query/src/datafusion.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use crate::plan::LogicalPlan;
5858
use crate::query_engine::{QueryEngineContext, QueryEngineState};
5959
use crate::{metric, QueryEngine};
6060

61-
pub(crate) struct DatafusionQueryEngine {
61+
pub struct DatafusionQueryEngine {
6262
state: QueryEngineState,
6363
}
6464

@@ -145,14 +145,14 @@ impl QueryEngine for DatafusionQueryEngine {
145145
// TODO(sunng87): consider cache optmised logical plan between describe
146146
// and execute
147147
let plan = self.statement_to_plan(stmt, query_ctx).await?;
148-
let mut ctx = QueryEngineContext::new(self.state.session_state());
149-
let optimised_plan = self.optimize_logical_plan(&mut ctx, &plan)?;
148+
let optimised_plan = self.optimize(&plan)?;
150149
optimised_plan.schema()
151150
}
152151

153152
async fn execute(&self, plan: &LogicalPlan) -> Result<Output> {
153+
let logical_plan = self.optimize(plan)?;
154+
154155
let mut ctx = QueryEngineContext::new(self.state.session_state());
155-
let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?;
156156
let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?;
157157
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
158158

@@ -185,16 +185,13 @@ impl QueryEngine for DatafusionQueryEngine {
185185
}
186186

187187
impl LogicalOptimizer for DatafusionQueryEngine {
188-
fn optimize_logical_plan(
189-
&self,
190-
ctx: &mut QueryEngineContext,
191-
plan: &LogicalPlan,
192-
) -> Result<LogicalPlan> {
188+
fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
193189
let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED);
194190
match plan {
195191
LogicalPlan::DfPlan(df_plan) => {
196-
let state = ctx.state();
197-
let optimized_plan = state
192+
let optimized_plan = self
193+
.state
194+
.session_state()
198195
.optimize(df_plan)
199196
.context(error::DatafusionSnafu {
200197
msg: "Fail to optimize logical plan",

src/query/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
mod datafusion;
15+
pub mod datafusion;
1616
pub mod error;
1717
pub mod executor;
1818
mod function;

0 commit comments

Comments
 (0)