Skip to content

Commit 3761c47

Browse files
committed
chore: add Task when test on test-private-task.sh
1 parent de0ce58 commit 3761c47

File tree

2 files changed

+167
-38
lines changed

2 files changed

+167
-38
lines changed

src/query/service/src/task/service.rs

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ impl TaskService {
223223
let task_name = task.task_name.to_string();
224224
let task_name_clone = task_name.clone();
225225
let task_service = TaskService::instance();
226+
let owner = Self::get_task_owner(&task, &tenant).await?;
226227

227228
let fn_lock =
228229
async |task_service: &TaskService,
@@ -272,6 +273,9 @@ impl TaskService {
272273
let Some(_guard) = fn_lock(&task_service, &task_key, duration.as_millis() as u64).await? else {
273274
continue;
274275
};
276+
if !Self::check_when(&task, &owner, &task_service).await.unwrap() {
277+
continue;
278+
}
275279
fn_new_task_run(&task_service, &task).await?;
276280
task_mgr.send(TaskMessage::ExecuteTask(task.clone())).await?;
277281
}
@@ -311,11 +315,15 @@ impl TaskService {
311315
.unwrap_or(Duration::ZERO);
312316

313317
task.next_scheduled_at = Some(Utc::now() + duration);
318+
task_mgr.update_task(task.clone()).await??;
314319
tokio::select! {
315320
_ = sleep(duration) => {
316321
let Some(_guard) = fn_lock(&task_service, &task_key, duration.as_millis() as u64).await? else {
317322
continue;
318323
};
324+
if !Self::check_when(&task, &owner, &task_service).await? {
325+
continue;
326+
}
319327
fn_new_task_run(&task_service, &task).await?;
320328
task_mgr.send(TaskMessage::ExecuteTask(task.clone())).await?;
321329
}
@@ -389,9 +397,20 @@ impl TaskService {
389397
.ok_or_else(|| {
390398
ErrorCode::UnknownTask(next_task)
391399
})?;
392-
task_mgr
393-
.send(TaskMessage::ExecuteTask(next_task))
394-
.await?;
400+
let next_owner =
401+
Self::get_task_owner(&next_task, &tenant)
402+
.await?;
403+
if Self::check_when(
404+
&next_task,
405+
&next_owner,
406+
&task_service,
407+
)
408+
.await?
409+
{
410+
task_mgr
411+
.send(TaskMessage::ExecuteTask(next_task))
412+
.await?;
413+
}
395414
}
396415
break;
397416
}
@@ -422,12 +441,12 @@ impl TaskService {
422441
)?;
423442
}
424443
TaskMessage::DeleteTask(task_name) => {
425-
if task_mgr.accept(&task_key).await? {
426-
self.clean_task_afters(&task_name).await?;
427-
}
428444
if let Some(token) = scheduled_tasks.remove(&task_name) {
429445
token.cancel();
430446
}
447+
if task_mgr.accept(&task_key).await? {
448+
self.clean_task_afters(&task_name).await?;
449+
}
431450
task_mgr
432451
.accept(&TaskMessageIdent::new(
433452
tenant,
@@ -495,42 +514,45 @@ impl TaskService {
495514
async fn spawn_task(task: Task, user: UserInfo) -> Result<()> {
496515
let task_service = TaskService::instance();
497516

498-
if let Some(when_condition) = &task.when_condition {
499-
let result = task_service
500-
.execute_sql(Some(user.clone()), &format!("SELECT {when_condition}"))
501-
.await?;
502-
let is_met = result
503-
.first()
504-
.and_then(|block| block.get_by_offset(0).index(0))
505-
.and_then(|scalar| {
506-
scalar
507-
.as_boolean()
508-
.cloned()
509-
.map(Ok)
510-
.or_else(|| scalar.as_string().map(|str| str.trim().parse::<bool>()))
511-
})
512-
.transpose()
513-
.map_err(|err| {
514-
ErrorCode::TaskWhenConditionNotMet(format!(
515-
"when condition error for task: {}, {}",
516-
task.task_name, err
517-
))
518-
})?
519-
.unwrap_or(false);
520-
if !is_met {
521-
return Err(ErrorCode::TaskWhenConditionNotMet(format!(
522-
"when condition not met for task: {}",
523-
task.task_name
524-
)));
525-
}
526-
}
527517
task_service
528518
.execute_sql(Some(user), &task.query_text)
529519
.await?;
530520

531521
Ok(())
532522
}
533523

524+
async fn check_when(
525+
task: &Task,
526+
user: &UserInfo,
527+
task_service: &Arc<TaskService>,
528+
) -> Result<bool> {
529+
let Some(when_condition) = &task.when_condition else {
530+
return Ok(true);
531+
};
532+
let result = task_service
533+
.execute_sql(Some(user.clone()), &format!("SELECT {when_condition}"))
534+
.await
535+
.unwrap();
536+
Ok(result
537+
.first()
538+
.and_then(|block| block.get_by_offset(0).index(0))
539+
.and_then(|scalar| {
540+
scalar
541+
.as_boolean()
542+
.cloned()
543+
.map(Ok)
544+
.or_else(|| scalar.as_string().map(|str| str.trim().parse::<bool>()))
545+
})
546+
.transpose()
547+
.map_err(|err| {
548+
ErrorCode::TaskWhenConditionNotMet(format!(
549+
"when condition error for task: {}, {}",
550+
task.task_name, err
551+
))
552+
})?
553+
.unwrap_or(false))
554+
}
555+
534556
pub async fn create_context(&self, other_user: Option<UserInfo>) -> Result<Arc<QueryContext>> {
535557
let (user, role) = if let Some(other_user) = other_user {
536558
(other_user, None)
@@ -792,7 +814,7 @@ WHERE ta.task_name = '{task_name}'
792814
task.query_text.replace('\'', "''"),
793815
task.when_condition
794816
.as_ref()
795-
.map(|s| format!("'{s}'").replace('\'', "''"))
817+
.map(|s| format!("'{}'", s.replace('\'', "''")))
796818
.unwrap_or_else(|| "null".to_string()),
797819
if !task.after.is_empty() {
798820
format!("'{}'", task.after.join(", "))

tests/task/test-private-task.sh

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ response8=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content
132132
alter_task_2_query_id=$(echo $response8 | jq -r '.id')
133133
echo "Resume Task 2 ID: $alter_task_2_query_id"
134134

135-
sleep 10
135+
sleep 8
136136

137137
response9=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1 FROM t1 ORDER BY c1\"}")
138138

@@ -166,7 +166,7 @@ python3 scripts/ci/wait_tcp.py --timeout 30 --port 9092
166166

167167
echo "Started 2-node cluster with private task enabled..."
168168

169-
sleep 7
169+
sleep 9
170170

171171
response9=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1 FROM t1 ORDER BY c1\"}")
172172

@@ -267,3 +267,110 @@ if [ "$state19" != "Succeeded" ]; then
267267
fi
268268
actual=$(echo "$response19" | jq -c '.data')
269269
echo "\n\nSELECT * FROM system.tasks: $actual"
270+
271+
# Test Task When on After & Schedule & Execute
272+
response20=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"CREATE TABLE t3 (c1 int, c2 int)\"}")
273+
create_table_query_id_2=$(echo $response20 | jq -r '.id')
274+
echo "Create Table Query ID: $create_table_query_id_2"
275+
276+
response21=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"CREATE TASK my_task_5 WAREHOUSE = 'mywh' SCHEDULE = 3 SECOND WHEN EXISTS (SELECT 1 FROM t3 WHERE c2 = 1) AS insert into t3 values(1, 0)\"}")
277+
create_task_5_query_id=$(echo $response21 | jq -r '.id')
278+
echo "Create Task 5 Query ID: $create_task_5_query_id"
279+
280+
response22=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"CREATE TASK my_task_6 WAREHOUSE = 'mywh' SCHEDULE = 5 SECOND WHEN EXISTS (SELECT 1 FROM t3 WHERE c2 = 1) AS insert into t3 values(2, 0)\"}")
281+
create_task_6_query_id=$(echo $response22 | jq -r '.id')
282+
echo "Create Task 6 Query ID: $create_task_6_query_id"
283+
284+
response23=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"CREATE TASK my_task_7 WAREHOUSE = 'mywh' AFTER 'my_task_5', 'my_task_6' WHEN EXISTS (SELECT 1 FROM t3 WHERE c2 = 2) AS insert into t3 values(3, 0)\"}")
285+
create_task_7_query_id=$(echo $response23 | jq -r '.id')
286+
echo "Create Task 7 Query ID: $create_task_7_query_id"
287+
288+
sleep 1
289+
290+
response24=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"ALTER TASK my_task_5 RESUME\"}")
291+
alter_task_5_query_id=$(echo $response24 | jq -r '.id')
292+
echo "Resume Task 5 ID: $alter_task_5_query_id"
293+
294+
response25=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"ALTER TASK my_task_6 RESUME\"}")
295+
alter_task_6_query_id=$(echo $response25 | jq -r '.id')
296+
echo "Resume Task 6 ID: $alter_task_6_query_id"
297+
298+
response26=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"ALTER TASK my_task_7 RESUME\"}")
299+
alter_task_7_query_id=$(echo $response26 | jq -r '.id')
300+
echo "Resume Task 7 ID: $alter_task_7_query_id"
301+
302+
sleep 6
303+
304+
response27=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1 FROM t3 ORDER BY c1\"}")
305+
306+
actual=$(echo "$response27" | jq -c '.data')
307+
expected='[]'
308+
309+
if [ "$actual" = "$expected" ]; then
310+
echo "✅ Query result matches expected"
311+
else
312+
echo "❌ Mismatch"
313+
echo "Expected: $expected"
314+
echo "Actual : $actual"
315+
exit 1
316+
fi
317+
318+
response28=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"INSERT INTO t3 VALUES (1, 1)\"}")
319+
insert_t3_query_id=$(echo $response28 | jq -r '.id')
320+
echo "INSERT T3 (1, 1) ID: $insert_t3_query_id"
321+
322+
sleep 5
323+
324+
response29=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1, c2 FROM t3 ORDER BY c1, c2\"}")
325+
326+
actual=$(echo "$response29" | jq -c '.data')
327+
expected='[["1","0"],["1","1"],["2","0"]]'
328+
329+
if [ "$actual" = "$expected" ]; then
330+
echo "✅ Query result matches expected"
331+
else
332+
echo "❌ Mismatch"
333+
echo "Expected: $expected"
334+
echo "Actual : $actual"
335+
exit 1
336+
fi
337+
338+
response30=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"EXECUTE TASK my_task_7\"}")
339+
execute_task_7_query_id=$(echo $response30 | jq -r '.id')
340+
echo "Execute Task 7 ID: $execute_task_7_query_id"
341+
342+
sleep 1
343+
344+
response31=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1, c2 FROM t3 ORDER BY c1, c2\"}")
345+
346+
actual=$(echo "$response31" | jq -c '.data')
347+
expected='[["1","0"],["1","1"],["2","0"],["3","0"]]'
348+
349+
if [ "$actual" = "$expected" ]; then
350+
echo "✅ Query result matches expected"
351+
else
352+
echo "❌ Mismatch"
353+
echo "Expected: $expected"
354+
echo "Actual : $actual"
355+
exit 1
356+
fi
357+
358+
response32=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"INSERT INTO t3 VALUES (2, 2)\"}")
359+
insert_t3_query_id_1=$(echo $response32 | jq -r '.id')
360+
echo "INSERT T3 (2, 2) ID: $insert_t3_query_id_1"
361+
362+
sleep 6
363+
364+
response33=$(curl -s -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d "{\"sql\": \"SELECT c1, c2 FROM t3 ORDER BY c1, c2\"}")
365+
366+
actual=$(echo "$response33" | jq -c '.data')
367+
expected='[["1","0"],["1","0"],["1","0"],["1","1"],["2","0"],["2","0"],["2","2"],["3","0"],["3","0"],["3","0"]]'
368+
369+
if [ "$actual" = "$expected" ]; then
370+
echo "✅ Query result matches expected"
371+
else
372+
echo "❌ Mismatch"
373+
echo "Expected: $expected"
374+
echo "Actual : $actual"
375+
exit 1
376+
fi

0 commit comments

Comments
 (0)