Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class ApiGateway {
try {
await this.assertApiScope('data', req.context?.securityContext);

await this.sqlServer.execSql(req.body.query, res, req.context?.securityContext, req.body.cache, req.body.timezone);
await this.sqlServer.execSql(req.body.query, res, req.context?.securityContext, req.body.cache, req.body.timezone, req.body.forceContinueWait);
} catch (e: any) {
this.handleError({
e,
Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-api-gateway/src/sql-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ export class SQLServer {
throw new Error('Native api gateway is not enabled');
}

public async execSql(sqlQuery: string, stream: any, securityContext?: any, cacheMode?: CacheMode, timezone?: string) {
await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext, cacheMode, timezone);
public async execSql(sqlQuery: string, stream: any, securityContext?: any, cacheMode?: CacheMode, timezone?: string, forceContinueWait?: boolean) {
await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext, cacheMode, timezone, forceContinueWait);
}

public async sql4sql(sqlQuery: string, disablePostProcessing: boolean, securityContext?: unknown): Promise<Sql4SqlResponse> {
Expand Down
42 changes: 40 additions & 2 deletions packages/cubejs-api-gateway/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,8 @@ describe('API Gateway', () => {
expect.anything(),
{},
undefined,
undefined
undefined,
undefined,
);
});

Expand Down Expand Up @@ -1224,7 +1225,44 @@ describe('API Gateway', () => {
expect.anything(),
{},
'stale-while-revalidate',
'America/Los_Angeles'
'America/Los_Angeles',
undefined,
);
});

test('forceContinueWait can be passed', async () => {
const { app, apiGateway } = await createApiGateway();

// Mock the sqlServer.execSql method
const execSqlMock = jest.fn(async (query, stream, securityContext, cacheMode, timezone) => {
// Simulate writing error to the stream
stream.write(`${JSON.stringify({
error: "Continue wait"
})}\n`);
stream.end();
});

apiGateway.getSQLServer().execSql = execSqlMock;

await request(app)
.post('/cubejs-api/v1/cubesql')
.set('Content-type', 'application/json')
.set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M')
.send({
query: 'SELECT id FROM test LIMIT 3',
forceContinueWait: true,
})
.responseType('text')
.expect(200);

// Verify the mock was called with correct parameters
expect(execSqlMock).toHaveBeenCalledWith(
'SELECT id FROM test LIMIT 3',
expect.anything(),
{},
undefined,
undefined,
true,
);
});
});
Expand Down
3 changes: 3 additions & 0 deletions packages/cubejs-backend-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ neon-debug = []
neon-entrypoint = []
python = ["pyo3", "pyo3-asyncio"]
async-log = ["log_nonblock"]

[lints.clippy]
too_many_arguments = "allow"
4 changes: 2 additions & 2 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ export const shutdownInterface = async (instance: SqlInterfaceInstance, shutdown
await native.shutdownInterface(instance, shutdownMode);
};

export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any, cacheMode: CacheMode = 'stale-if-slow', timezone?: string): Promise<void> => {
export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any, cacheMode: CacheMode = 'stale-if-slow', timezone?: string, forceContinueWait?: boolean): Promise<void> => {
const native = loadNative();

await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null, cacheMode, timezone);
await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null, cacheMode, timezone, forceContinueWait);
};

// TODO parse result from native code
Expand Down
25 changes: 25 additions & 0 deletions packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ async fn handle_sql_query(
sql_query: &str,
cache_mode: &str,
timezone: Option<String>,
force_continue_wait: bool,
) -> Result<(), CubeError> {
let span_id = Some(Arc::new(SpanId::new(
Uuid::new_v4().to_string(),
Expand Down Expand Up @@ -276,6 +277,15 @@ async fn handle_sql_query(
*cm = Some(cache_enum);
}

{
let mut cm = session
.state
.force_continue_wait
.write()
.expect("failed to unlock session force_continue_wait for change");
*cm = force_continue_wait;
}

let session_clone = Arc::clone(&session);
let span_id_clone = span_id.clone();

Expand Down Expand Up @@ -471,6 +481,20 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult<JsValue> {
Err(_) => None,
};

let force_continue_wait: bool = match cx.argument::<JsValue>(6) {
Ok(val) => {
if val.is_a::<JsNull, _>(&mut cx) || val.is_a::<JsUndefined, _>(&mut cx) {
false
} else {
match val.downcast::<JsBoolean, _>(&mut cx) {
Ok(v) => v.value(&mut cx),
Err(_) => false,
}
}
}
Err(_) => false,
};

let js_stream_on_fn = Arc::new(
node_stream
.get::<JsFunction, _, _>(&mut cx, "on")?
Expand Down Expand Up @@ -520,6 +544,7 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult<JsValue> {
&sql_query,
&cache_mode,
timezone,
force_continue_wait,
)
.await;

Expand Down
29 changes: 23 additions & 6 deletions packages/cubejs-backend-native/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl TransportService for NodeBridgeTransport {
schema: SchemaRef,
member_fields: Vec<MemberField>,
cache_mode: Option<CacheMode>,
force_continue_wait: bool,
) -> Result<Vec<RecordBatch>, CubeError> {
trace!("[transport] Request ->");

Expand Down Expand Up @@ -451,6 +452,9 @@ impl TransportService for NodeBridgeTransport {

if let Err(e) = &result {
if e.message.to_lowercase().contains("continue wait") {
if force_continue_wait {
return Err(CubeError::internal("Continue wait".to_string()));
}
continue;
}
}
Expand All @@ -471,15 +475,24 @@ impl TransportService for NodeBridgeTransport {
match error_value {
serde_json::Value::String(error) => {
if error.to_lowercase() == *"continue wait" {
debug!(
"[transport] load - retrying request (continue wait) requestId: {}",
request_id
);
if force_continue_wait {
debug!(
"[transport] load - throwing continue wait, requestId: {}",
request_id
);
return Err(CubeError::internal(
"Continue wait".to_string(),
));
}

debug!(
"[transport] load - retrying request (continue wait) requestId: {}",
request_id
);
continue;
} else {
return Err(CubeError::user(error.clone()));
}

return Err(CubeError::user(error.clone()));
}
other => {
error!(
Expand Down Expand Up @@ -538,6 +551,7 @@ impl TransportService for NodeBridgeTransport {
meta: LoadRequestMeta,
schema: SchemaRef,
member_fields: Vec<MemberField>,
force_continue_wait: bool,
) -> Result<CubeStreamReceiver, CubeError> {
trace!("[transport] Request ->");

Expand Down Expand Up @@ -585,6 +599,9 @@ impl TransportService for NodeBridgeTransport {

if let Err(e) = &res {
if e.message.to_lowercase().contains("continue wait") {
if force_continue_wait {
return Err(CubeError::internal("Continue wait".to_string()));
}
continue;
}
}
Expand Down
6 changes: 6 additions & 0 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct CubeScanOptions {
pub change_user: Option<String>,
pub max_records: Option<usize>,
pub cache_mode: Option<CacheMode>,
pub force_continue_wait: bool,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -481,6 +482,7 @@ impl ExecutionPlan for CubeScanExecutionPlan {
meta,
self.schema.clone(),
self.member_fields.clone(),
self.options.force_continue_wait,
)
.await;
let stream = result.map_err(|err| DataFusionError::External(Box::new(err)))?;
Expand Down Expand Up @@ -721,6 +723,7 @@ async fn load_data(
schema,
member_fields,
options.cache_mode,
options.force_continue_wait,
)
.await
.map_err(|mut err| {
Expand Down Expand Up @@ -1260,6 +1263,7 @@ mod tests {
schema: SchemaRef,
member_fields: Vec<MemberField>,
_cache_mode: Option<CacheMode>,
_force_continue_wait: bool,
) -> Result<Vec<RecordBatch>, CubeError> {
let response = r#"
{
Expand Down Expand Up @@ -1295,6 +1299,7 @@ mod tests {
_meta_fields: LoadRequestMeta,
_schema: SchemaRef,
_member_fields: Vec<MemberField>,
_force_continue_wait: bool,
) -> Result<CubeStreamReceiver, CubeError> {
panic!("It's a fake transport");
}
Expand Down Expand Up @@ -1386,6 +1391,7 @@ mod tests {
change_user: None,
max_records: None,
cache_mode: None,
force_continue_wait: false,
},
transport: get_test_transport(),
meta: get_test_load_meta(DatabaseProtocol::PostgreSQL),
Expand Down
8 changes: 8 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,13 @@ impl LanguageToLogicalPlanConverter {
.read()
.expect("failed to read lock for session cache_mode");

let force_continue_wait = *self
.cube_context
.session_state
.force_continue_wait
.read()
.expect("failed to read lock for session force_continue_wait");

let node = Arc::new(CubeScanNode::new(
Arc::new(DFSchema::new_with_metadata(
fields.into_iter().map(|(f, _)| f).collect(),
Expand All @@ -2093,6 +2100,7 @@ impl LanguageToLogicalPlanConverter {
change_user,
max_records,
cache_mode: cache_mode.clone(),
force_continue_wait,
},
alias_to_cube.into_iter().map(|(_, c)| c).unique().collect(),
self.span_id.clone(),
Expand Down
2 changes: 2 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ impl TransportService for TestConnectionTransport {
schema: SchemaRef,
member_fields: Vec<MemberField>,
_cache_mode: Option<CacheMode>,
_force_continue_wait: bool,
) -> Result<Vec<RecordBatch>, CubeError> {
{
let mut calls = self.load_calls.lock().await;
Expand Down Expand Up @@ -956,6 +957,7 @@ impl TransportService for TestConnectionTransport {
_meta_fields: LoadRequestMeta,
_schema: SchemaRef,
_member_fields: Vec<MemberField>,
_force_continue_wait: bool,
) -> Result<CubeStreamReceiver, CubeError> {
panic!("It's a fake transport");
}
Expand Down
3 changes: 3 additions & 0 deletions rust/cubesql/cubesql/src/sql/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct SessionState {
pub cache_mode: RwLockSync<Option<CacheMode>>,

pub query_timezone: RwLockSync<Option<String>>,

pub force_continue_wait: RwLockSync<bool>,
}

impl SessionState {
Expand Down Expand Up @@ -127,6 +129,7 @@ impl SessionState {
auth_context_expiration,
cache_mode: RwLockSync::new(None),
query_timezone: RwLockSync::new(None),
force_continue_wait: RwLockSync::new(false),
}
}

Expand Down
4 changes: 4 additions & 0 deletions rust/cubesql/cubesql/src/transport/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ pub trait TransportService: Send + Sync + Debug {
schema: SchemaRef,
member_fields: Vec<MemberField>,
cache_mode: Option<CacheMode>,
force_continue_wait: bool,
) -> Result<Vec<RecordBatch>, CubeError>;

async fn load_stream(
Expand All @@ -157,6 +158,7 @@ pub trait TransportService: Send + Sync + Debug {
meta_fields: LoadRequestMeta,
schema: SchemaRef,
member_fields: Vec<MemberField>,
force_continue_wait: bool,
) -> Result<CubeStreamReceiver, CubeError>;

async fn can_switch_user_for_session(
Expand Down Expand Up @@ -287,6 +289,7 @@ impl TransportService for HttpTransport {
schema: SchemaRef,
member_fields: Vec<MemberField>,
cache_mode: Option<CacheMode>,
_force_continue_wait: bool,
) -> Result<Vec<RecordBatch>, CubeError> {
if meta.change_user().is_some() {
return Err(CubeError::internal(
Expand Down Expand Up @@ -328,6 +331,7 @@ impl TransportService for HttpTransport {
_meta_fields: LoadRequestMeta,
_schema: SchemaRef,
_member_fields: Vec<MemberField>,
_force_continue_wait: bool,
) -> Result<CubeStreamReceiver, CubeError> {
panic!("Does not work for standalone mode yet");
}
Expand Down
Loading