Skip to content

Commit 5da39ad

Browse files
committed
feat: Add SQL queries support in /v1/sql endpoint
1 parent 585ef3d commit 5da39ad

File tree

8 files changed

+743
-2
lines changed

8 files changed

+743
-2
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
QueryType as QueryTypeEnum, ResultType
3434
} from './types/enums';
3535
import {
36+
BaseRequest,
3637
RequestContext,
3738
ExtendedRequestContext,
3839
Request,
@@ -324,6 +325,17 @@ class ApiGateway {
324325
}));
325326

326327
app.get(`${this.basePath}/v1/sql`, userMiddlewares, userAsyncHandler(async (req: any, res) => {
328+
// TODO parse req.query with zod/joi/...
329+
330+
if (req.query.format === 'sql') {
331+
await this.sql4sql({
332+
query: req.query.query,
333+
context: req.context,
334+
res: this.resToResultFn(res)
335+
});
336+
return;
337+
}
338+
327339
await this.sql({
328340
query: req.query.query,
329341
context: req.context,
@@ -332,6 +344,17 @@ class ApiGateway {
332344
}));
333345

334346
app.post(`${this.basePath}/v1/sql`, jsonParser, userMiddlewares, userAsyncHandler(async (req, res) => {
347+
// TODO parse req.body with zod/joi/...
348+
349+
if (req.body.format === 'sql') {
350+
await this.sql4sql({
351+
query: req.body.query,
352+
context: req.context,
353+
res: this.resToResultFn(res)
354+
});
355+
return;
356+
}
357+
335358
await this.sql({
336359
query: req.body.query,
337360
context: req.context,
@@ -1281,6 +1304,26 @@ class ApiGateway {
12811304
return [queryType, normalizedQueries, queryNormalizationResult.map((it) => remapToQueryAdapterFormat(it.normalizedQuery))];
12821305
}
12831306

1307+
protected async sql4sql({
1308+
query,
1309+
context,
1310+
res,
1311+
}: {query: string} & BaseRequest) {
1312+
try {
1313+
await this.assertApiScope('data', context.securityContext);
1314+
1315+
const result = await this.sqlServer.sql4sql(query, context.securityContext);
1316+
res({ sql: result });
1317+
} catch (e: any) {
1318+
this.handleError({
1319+
e,
1320+
context,
1321+
query,
1322+
res,
1323+
});
1324+
}
1325+
}
1326+
12841327
public async sql({
12851328
query,
12861329
context,

packages/cubejs-api-gateway/src/sql-server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import {
33
registerInterface,
44
shutdownInterface,
55
execSql,
6+
sql4sql,
67
SqlInterfaceInstance,
78
Request as NativeRequest,
89
LoadRequestMeta,
10+
Sql4SqlResponse,
911
} from '@cubejs-backend/native';
1012
import type { ShutdownMode } from '@cubejs-backend/native';
1113
import { displayCLIWarning, getEnv } from '@cubejs-backend/shared';
@@ -62,6 +64,10 @@ export class SQLServer {
6264
await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext);
6365
}
6466

67+
public async sql4sql(sqlQuery: string, securityContext?: any): Promise<Sql4SqlResponse> {
68+
return sql4sql(this.sqlInterfaceInstance!, sqlQuery, securityContext);
69+
}
70+
6571
protected buildCheckSqlAuth(options: SQLServerOptions): CheckSQLAuthFn {
6672
return (options.checkSqlAuth && this.wrapCheckSqlAuthFn(options.checkSqlAuth))
6773
|| this.createDefaultCheckSqlAuthFn(options);

packages/cubejs-backend-native/js/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,21 @@ export type DBResponsePrimitive =
124124
number |
125125
string;
126126

127+
// TODO type this better, to make it proper disjoint union
128+
export type Sql4SqlOk = {
129+
sql: string,
130+
values: Array<string | null>,
131+
};
132+
export type Sql4SqlError = { error: string };
133+
export type Sql4SqlCommon = {
134+
query_type: {
135+
regular: boolean;
136+
post_processing: boolean;
137+
pushdown: boolean;
138+
}
139+
};
140+
export type Sql4SqlResponse = Sql4SqlCommon & (Sql4SqlOk | Sql4SqlError);
141+
127142
let loadedNative: any = null;
128143

129144
export function loadNative() {
@@ -389,6 +404,13 @@ export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string,
389404
await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null);
390405
};
391406

407+
// TODO parse result from native code
408+
export const sql4sql = async (instance: SqlInterfaceInstance, sqlQuery: string, securityContext?: any): Promise<Sql4SqlResponse> => {
409+
const native = loadNative();
410+
411+
return native.sql4sql(instance, sqlQuery, securityContext ? JSON.stringify(securityContext) : null);
412+
};
413+
392414
export const buildSqlAndParams = (cubeEvaluator: any): String => {
393415
const native = loadNative();
394416

packages/cubejs-backend-native/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod node_obj_serializer;
1616
pub mod orchestrator;
1717
#[cfg(feature = "python")]
1818
pub mod python;
19+
pub mod sql4sql;
1920
pub mod stream;
2021
pub mod template;
2122
pub mod transport;

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::config::{NodeConfiguration, NodeConfigurationFactoryOptions, NodeCube
1212
use crate::cross::CLRepr;
1313
use crate::cubesql_utils::with_session;
1414
use crate::logger::NodeBridgeLogger;
15+
use crate::sql4sql::sql4sql;
1516
use crate::stream::OnDrainHandler;
1617
use crate::tokio_runtime_node;
1718
use crate::transport::NodeBridgeTransport;
@@ -32,8 +33,8 @@ use cubesql::{telemetry::ReportingLogger, CubeError};
3233

3334
use neon::prelude::*;
3435

35-
struct SQLInterface {
36-
services: Arc<NodeCubeServices>,
36+
pub(crate) struct SQLInterface {
37+
pub(crate) services: Arc<NodeCubeServices>,
3738
}
3839

3940
impl Finalize for SQLInterface {}
@@ -546,6 +547,7 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
546547
cx.export_function("registerInterface", register_interface::<C>)?;
547548
cx.export_function("shutdownInterface", shutdown_interface)?;
548549
cx.export_function("execSql", exec_sql)?;
550+
cx.export_function("sql4sql", sql4sql)?;
549551
cx.export_function("isFallbackBuild", is_fallback_build)?;
550552
cx.export_function("__js_to_clrepr_to_js", debug_js_to_clrepr_to_js)?;
551553

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use std::sync::Arc;
2+
3+
use neon::prelude::*;
4+
5+
use cubesql::compile::convert_sql_to_cube_query;
6+
use cubesql::compile::datafusion::logical_plan::LogicalPlan;
7+
use cubesql::compile::engine::df::scan::CubeScanNode;
8+
use cubesql::compile::engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode};
9+
use cubesql::sql::Session;
10+
use cubesql::transport::MetaContext;
11+
use cubesql::CubeError;
12+
13+
use crate::auth::NativeAuthContext;
14+
use crate::config::NodeCubeServices;
15+
use crate::cubesql_utils::with_session;
16+
use crate::tokio_runtime_node;
17+
18+
enum Sql4SqlQueryType {
19+
Regular,
20+
PostProcessing,
21+
Pushdown,
22+
}
23+
24+
impl Sql4SqlQueryType {
25+
pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsString> {
26+
let self_str = match self {
27+
Self::Regular => "regular",
28+
Self::PostProcessing => "post_processing",
29+
Self::Pushdown => "pushdown",
30+
};
31+
32+
Ok(cx.string(self_str))
33+
}
34+
}
35+
36+
enum Sql4SqlResponseResult {
37+
Ok {
38+
sql: String,
39+
values: Vec<Option<String>>,
40+
},
41+
Error {
42+
error: String,
43+
},
44+
}
45+
46+
struct Sql4SqlResponse {
47+
result: Sql4SqlResponseResult,
48+
query_type: Sql4SqlQueryType,
49+
}
50+
51+
impl Sql4SqlResponse {
52+
pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsObject> {
53+
let obj = cx.empty_object();
54+
55+
match &self.result {
56+
Sql4SqlResponseResult::Ok { sql, values } => {
57+
let status = cx.string("ok");
58+
obj.set(cx, "status", status)?;
59+
60+
let sql_tuple = cx.empty_array();
61+
let sql = cx.string(sql);
62+
sql_tuple.set(cx, 0, sql)?;
63+
let js_values = cx.empty_array();
64+
for (i, v) in values.iter().enumerate() {
65+
use std::convert::TryFrom;
66+
let i = u32::try_from(i).unwrap();
67+
let v: Handle<JsValue> = v
68+
.as_ref()
69+
.map(|v| cx.string(v).upcast())
70+
.unwrap_or_else(|| cx.null().upcast());
71+
js_values.set(cx, i, v)?;
72+
}
73+
sql_tuple.set(cx, 1, js_values)?;
74+
obj.set(cx, "sql", sql_tuple)?;
75+
}
76+
Sql4SqlResponseResult::Error { error } => {
77+
let status = cx.string("error");
78+
obj.set(cx, "status", status)?;
79+
80+
let error = cx.string(error);
81+
obj.set(cx, "error", error)?;
82+
}
83+
}
84+
85+
let query_type = self.query_type.to_js(cx)?;
86+
obj.set(cx, "query_type", query_type)?;
87+
88+
Ok(obj)
89+
}
90+
}
91+
92+
async fn get_sql(
93+
session: &Session,
94+
meta_context: Arc<MetaContext>,
95+
plan: Arc<LogicalPlan>,
96+
) -> Result<Sql4SqlResponse, CubeError> {
97+
let auth_context = session
98+
.state
99+
.auth_context()
100+
.ok_or_else(|| CubeError::internal("Unexpected missing auth context".to_string()))?;
101+
102+
match plan.as_ref() {
103+
LogicalPlan::Extension(extension) => {
104+
let cube_scan_wrapped_sql = extension
105+
.node
106+
.as_any()
107+
.downcast_ref::<CubeScanWrappedSqlNode>();
108+
109+
if let Some(cube_scan_wrapped_sql) = cube_scan_wrapped_sql {
110+
return Ok(Sql4SqlResponse {
111+
result: Sql4SqlResponseResult::Ok {
112+
sql: cube_scan_wrapped_sql.wrapped_sql.sql.clone(),
113+
values: cube_scan_wrapped_sql.wrapped_sql.values.clone(),
114+
},
115+
query_type: Sql4SqlQueryType::Pushdown,
116+
});
117+
}
118+
119+
if extension.node.as_any().is::<CubeScanNode>() {
120+
let cube_scan_wrapper = CubeScanWrapperNode::new(
121+
plan,
122+
meta_context,
123+
auth_context,
124+
None,
125+
session.server.config_obj.clone(),
126+
);
127+
let wrapped_sql = cube_scan_wrapper
128+
.generate_sql(
129+
session.server.transport.clone(),
130+
Arc::new(session.state.get_load_request_meta("sql")),
131+
)
132+
.await?;
133+
134+
return Ok(Sql4SqlResponse {
135+
result: Sql4SqlResponseResult::Ok {
136+
sql: wrapped_sql.wrapped_sql.sql.clone(),
137+
values: wrapped_sql.wrapped_sql.values.clone(),
138+
},
139+
query_type: Sql4SqlQueryType::Regular,
140+
});
141+
}
142+
143+
Err(CubeError::internal(
144+
"Unexpected extension in logical plan root".to_string(),
145+
))
146+
}
147+
_ => Ok(Sql4SqlResponse {
148+
result: Sql4SqlResponseResult::Error {
149+
error: "Provided query can not be executed without post-processing.".to_string(),
150+
},
151+
query_type: Sql4SqlQueryType::PostProcessing,
152+
}),
153+
}
154+
}
155+
156+
async fn handle_sql4sql_query(
157+
services: Arc<NodeCubeServices>,
158+
native_auth_ctx: Arc<NativeAuthContext>,
159+
sql_query: &str,
160+
) -> Result<Sql4SqlResponse, CubeError> {
161+
with_session(&services, native_auth_ctx.clone(), |session| async move {
162+
let transport = session.server.transport.clone();
163+
// todo: can we use compiler_cache?
164+
let meta_context = transport
165+
.meta(native_auth_ctx)
166+
.await
167+
.map_err(|err| CubeError::internal(format!("Failed to get meta context: {err}")))?;
168+
let query_plan =
169+
convert_sql_to_cube_query(sql_query, meta_context.clone(), session.clone()).await?;
170+
let logical_plan = query_plan.try_as_logical_plan()?;
171+
get_sql(&session, meta_context, Arc::new(logical_plan.clone())).await
172+
})
173+
.await
174+
}
175+
176+
pub fn sql4sql(mut cx: FunctionContext) -> JsResult<JsValue> {
177+
let interface = cx.argument::<JsBox<crate::node_export::SQLInterface>>(0)?;
178+
let sql_query = cx.argument::<JsString>(1)?.value(&mut cx);
179+
180+
let security_context: Option<serde_json::Value> = match cx.argument::<JsValue>(2) {
181+
Ok(string) => match string.downcast::<JsString, _>(&mut cx) {
182+
Ok(v) => v.value(&mut cx).parse::<serde_json::Value>().ok(),
183+
Err(_) => None,
184+
},
185+
Err(_) => None,
186+
};
187+
188+
let services = interface.services.clone();
189+
let runtime = tokio_runtime_node(&mut cx)?;
190+
191+
let channel = cx.channel();
192+
193+
let native_auth_ctx = Arc::new(NativeAuthContext {
194+
user: Some(String::from("unknown")),
195+
superuser: false,
196+
security_context,
197+
});
198+
199+
let (deferred, promise) = cx.promise();
200+
201+
// In case spawned task panics or gets aborted before settle call it will leave permanently pending Promise in JS land
202+
// We don't want to just waste whole thread (doesn't really matter main or worker or libuv thread pool)
203+
// just busy waiting that JoinHandle
204+
// TODO handle JoinError
205+
// keep JoinHandle alive in JS thread
206+
// check join handle from JS thread periodically, reject promise on JoinError
207+
// maybe register something like uv_check handle (libuv itself does not have ABI stability of N-API)
208+
// can do it relatively rare, and in a single loop for all JoinHandles
209+
// this is just a watchdog for a Very Bad case, so latency requirement can be quite relaxed
210+
runtime.spawn(async move {
211+
let result = handle_sql4sql_query(services, native_auth_ctx, &sql_query).await;
212+
213+
if let Err(err) = deferred.try_settle_with(&channel, move |mut cx| {
214+
// `neon::result::ResultExt` is implemented only for Result<Handle, Handle>, even though Ok variant is not touched
215+
let response = result.or_else(|err| cx.throw_error(err.to_string()))?;
216+
let response = response.to_js(&mut cx)?;
217+
Ok(response)
218+
}) {
219+
// There is not much we can do at this point
220+
// TODO lift this error to task => JoinHandle => JS watchdog
221+
log::error!(
222+
"Unable to settle JS promise from tokio task, try_settle_with failed, err: {err}"
223+
);
224+
}
225+
});
226+
227+
Ok(promise.upcast::<JsValue>())
228+
}

0 commit comments

Comments
 (0)