diff --git a/engine/client/task_dispatcher.go b/engine/client/task_dispatcher.go index 5fbf93132e7..3c9696148a9 100644 --- a/engine/client/task_dispatcher.go +++ b/engine/client/task_dispatcher.go @@ -85,6 +85,7 @@ func (d *TaskDispatcher) DispatchTask( ) error { requestID, err := d.preDispatchTaskWithRetry(ctx, args) if err != nil { + abortWorker(err) return derrors.ErrExecutorPreDispatchFailed.Wrap(err) } diff --git a/engine/client/task_dispatcher_test.go b/engine/client/task_dispatcher_test.go index 1aa0e8461db..5033a30ba85 100644 --- a/engine/client/task_dispatcher_test.go +++ b/engine/client/task_dispatcher_test.go @@ -92,14 +92,17 @@ func TestPreDispatchAborted(t *testing.T) { Return((*ExecutorResponse)(nil), status.Error(codes.Aborted, "aborted error")). Once() // Aborted calls should NOT be retried. + var abortCalled atomic.Bool err := dispatcher.DispatchTask(context.Background(), args, func() { require.Fail(t, "the callback should never be called") }, func(error) { - require.Fail(t, "not expected") + require.False(t, abortCalled.Swap(true)) }) require.Error(t, err) require.Regexp(t, ".*aborted error.*", err) mockExecClient.AssertExpectations(t) + + require.True(t, abortCalled.Load()) } func TestAlreadyExistsPanics(t *testing.T) { @@ -144,8 +147,9 @@ func TestDispatchRetryCanceled(t *testing.T) { } var ( - retryCount atomic.Int64 - wg sync.WaitGroup + retryCount atomic.Int64 + abortCalled atomic.Bool + wg sync.WaitGroup ) mockExecClient.On("Send", mock.Anything, mock.Anything). Return((*ExecutorResponse)(nil), status.Error(codes.Unknown, "should retry")).Run( @@ -168,10 +172,11 @@ func TestDispatchRetryCanceled(t *testing.T) { err := dispatcher.DispatchTask(cancelCtx, args, func() { require.Fail(t, "the callback should never be called") }, func(error) { - require.Fail(t, "not expected") + require.False(t, abortCalled.Swap(true)) }) require.Error(t, err) require.Regexp(t, ".*ErrExecutorPreDispatchFailed.*", err) + require.True(t, abortCalled.Load()) wg.Wait() } diff --git a/engine/lib/master/worker_manager_test.go b/engine/lib/master/worker_manager_test.go index a0a8a8d688d..d431a422b46 100644 --- a/engine/lib/master/worker_manager_test.go +++ b/engine/lib/master/worker_manager_test.go @@ -263,6 +263,22 @@ func TestCreateWorkerAndWorkerTimesOut(t *testing.T) { suite.Close() } +func TestCreateWorkerPredispatchFailed(t *testing.T) { + t.Parallel() + + suite := NewWorkerManageTestSuite(true) + suite.manager.AbortCreatingWorker("worker-1", errors.New("injected error")) + + event := suite.WaitForEvent(t, "worker-1") + require.Equal(t, workerDispatchFailedEvent, event.Tp) + require.NotNil(t, event.Handle.GetTombstone()) + require.Error(t, event.Err) + require.Regexp(t, ".*injected error.*", event.Err) + + suite.AssertNoEvents(t, "worker-1", 500*time.Millisecond) + suite.Close() +} + func TestCreateWorkerAndWorkerStatusUpdatedAndTimesOut(t *testing.T) { t.Parallel() diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 149a1318ef2..b64a195a81b 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -295,7 +295,9 @@ func NewCmdServer() *cobra.Command { if err != nil { return err } - return o.run(cmd) + err = o.run(cmd) + cobra.CheckErr(err) + return nil }, } diff --git a/tests/integration_tests/_utils/check_usage_tips b/tests/integration_tests/_utils/check_usage_tips new file mode 100755 index 00000000000..81726d6643c --- /dev/null +++ b/tests/integration_tests/_utils/check_usage_tips @@ -0,0 +1,27 @@ +#!/bin/bash +# parameter 1: stdout file +# parameter 2: should_contain true: cmd is valid and usage tips should be printed, false: otherwise +stdout_file=$1 +should_contain=$2 +contain= +set +e + +## check stdout +if [ ! -f "$stdout_file" ]; then + echo "stdout log file does not exist" + exit 0 +fi + +grep -q 'Usage:' "$stdout_file" +if [[ $? -eq 0 ]]; then + contain="false" +else + contain="true" +fi + +if [[ "$contain" == "$should_contain" ]]; then + exit 0 +else + echo "Check Usage Tips Failed!" + exit 1 +fi diff --git a/tests/integration_tests/_utils/run_cdc_server b/tests/integration_tests/_utils/run_cdc_server index 05ed8738200..13786faaf83 100755 --- a/tests/integration_tests/_utils/run_cdc_server +++ b/tests/integration_tests/_utils/run_cdc_server @@ -27,6 +27,7 @@ failpoint=$GO_FAILPOINTS config_path= data_dir= curl_status_cmd= +supposed_to_fail="no" while [[ ${1} ]]; do case "${1}" in @@ -77,6 +78,10 @@ while [[ ${1} ]]; do config_path="--config ${2}" shift ;; + --supposed-to-fail) + supposed_to_fail="${2}" + shift + ;; --data-dir) data_dir=${2} shift @@ -151,6 +156,12 @@ else curl_status_cmd="curl --cacert $tls_dir/ca.pem --cert $tls_dir/$certcn_name.pem --key $tls_dir/$certcn_name-key.pem -vsL --max-time 20 https://$addr_url/debug/info" fi +# If the command is supposed to fail (in check usage tips test), just exit without retry +if [[ "$supposed_to_fail" != "no" ]]; then + set +x + exit 0 +fi + for ((i = 0; i <= 50; i++)); do res=$($curl_status_cmd) # Make sure we get the capture info(etcd_info_msg) and that there are no errors(get_info_fail_msg). diff --git a/tests/integration_tests/cdc_server_tips/run.sh b/tests/integration_tests/cdc_server_tips/run.sh new file mode 100644 index 00000000000..d8f2935d528 --- /dev/null +++ b/tests/integration_tests/cdc_server_tips/run.sh @@ -0,0 +1,91 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 +stdout_file=$WORK_DIR/stdout.log +cdc_launched= + +function prepare_tidb_cluster() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql "CREATE table test.simple1(id int primary key, val int);" +} + +function try_to_run_cdc() { + if [[ $1 == "valid" ]]; then + echo "try a VALID cdc server command" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + else + echo "try an INVALID cdc server command" + run_cdc_server --supposed-to-fail "true" --workdir $WORK_DIR --binary $CDC_BINARY --pd "None" + fi + + #Wait the failed cdc to quit + sleep 20 + echo $1" ~~~ running cdc " "$(ps -a | grep 'cdc')" + + if [[ $(ps -a | grep "cdc.test") == "" ]]; then + cdc_launched="false" + echo 'Failed to start cdc, the usage tips should be printed' + return 0 + fi + + cdc_launched="true" + echo 'Succeed to run cdc, now create a changefeed, no usage tips should be printed' + echo "pid"$(ps -a | grep "cdc.test") + + TOPIC_NAME="ticdc-server-tips-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_server_tips&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + *) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + fi + echo 'Succeed to create a changefeed, no usage tips should be printed' +} + +stop_cdc() { + echo "Later, cdc will receive a signal(SIGINT) and exit" + cdc_pid=$(ps -a | grep -m 1 "cdc.test" | awk '{print $1}') + echo "cdc pid is "$cdc_pid + sleep 60 + kill -2 $cdc_pid + sleep 30 +} + +trap stop_tidb_cluster EXIT +prepare_tidb_cluster + +# If cdc gets started normally, no usage tips should be printed when exit +try_to_run_cdc "valid" +if [[ "$cdc_launched" == "true" ]]; then + # If the cdc was launched, send a signal to stop it and check stdout + stop_cdc + check_usage_tips "$stdout_file" "true" +fi +echo " 1st test case $TEST_NAME success! " + +# invalid command and should print usage tips +try_to_run_cdc "invalid" +if [[ "$cdc_launched" == "false" ]]; then + check_usage_tips "$stdout_file" "false" +else + echo "CDC should not get started with invalid argument" + exit 1 +fi +echo " 2nd test case $TEST_NAME success! " + +echo "[$(date)] <<<<<< run all test cases $TEST_NAME success! >>>>>> "