Skip to content

Commit

Permalink
Merge branch 'master' into primaryToHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 26, 2022
2 parents 5e10ff5 + e87ea0f commit 9bff5d2
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 5 deletions.
1 change: 1 addition & 0 deletions engine/client/task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (d *TaskDispatcher) DispatchTask(
) error {
requestID, err := d.preDispatchTaskWithRetry(ctx, args)
if err != nil {
abortWorker(err)
return derrors.ErrExecutorPreDispatchFailed.Wrap(err)
}

Expand Down
13 changes: 9 additions & 4 deletions engine/client/task_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,17 @@ func TestPreDispatchAborted(t *testing.T) {
Return((*ExecutorResponse)(nil), status.Error(codes.Aborted, "aborted error")).
Once() // Aborted calls should NOT be retried.

var abortCalled atomic.Bool
err := dispatcher.DispatchTask(context.Background(), args, func() {
require.Fail(t, "the callback should never be called")
}, func(error) {
require.Fail(t, "not expected")
require.False(t, abortCalled.Swap(true))
})
require.Error(t, err)
require.Regexp(t, ".*aborted error.*", err)
mockExecClient.AssertExpectations(t)

require.True(t, abortCalled.Load())
}

func TestAlreadyExistsPanics(t *testing.T) {
Expand Down Expand Up @@ -144,8 +147,9 @@ func TestDispatchRetryCanceled(t *testing.T) {
}

var (
retryCount atomic.Int64
wg sync.WaitGroup
retryCount atomic.Int64
abortCalled atomic.Bool
wg sync.WaitGroup
)
mockExecClient.On("Send", mock.Anything, mock.Anything).
Return((*ExecutorResponse)(nil), status.Error(codes.Unknown, "should retry")).Run(
Expand All @@ -168,10 +172,11 @@ func TestDispatchRetryCanceled(t *testing.T) {
err := dispatcher.DispatchTask(cancelCtx, args, func() {
require.Fail(t, "the callback should never be called")
}, func(error) {
require.Fail(t, "not expected")
require.False(t, abortCalled.Swap(true))
})
require.Error(t, err)
require.Regexp(t, ".*ErrExecutorPreDispatchFailed.*", err)
require.True(t, abortCalled.Load())

wg.Wait()
}
Expand Down
16 changes: 16 additions & 0 deletions engine/lib/master/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,22 @@ func TestCreateWorkerAndWorkerTimesOut(t *testing.T) {
suite.Close()
}

func TestCreateWorkerPredispatchFailed(t *testing.T) {
t.Parallel()

suite := NewWorkerManageTestSuite(true)
suite.manager.AbortCreatingWorker("worker-1", errors.New("injected error"))

event := suite.WaitForEvent(t, "worker-1")
require.Equal(t, workerDispatchFailedEvent, event.Tp)
require.NotNil(t, event.Handle.GetTombstone())
require.Error(t, event.Err)
require.Regexp(t, ".*injected error.*", event.Err)

suite.AssertNoEvents(t, "worker-1", 500*time.Millisecond)
suite.Close()
}

func TestCreateWorkerAndWorkerStatusUpdatedAndTimesOut(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ func NewCmdServer() *cobra.Command {
if err != nil {
return err
}
return o.run(cmd)
err = o.run(cmd)
cobra.CheckErr(err)
return nil
},
}

Expand Down
27 changes: 27 additions & 0 deletions tests/integration_tests/_utils/check_usage_tips
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash
# parameter 1: stdout file
# parameter 2: should_contain true: cmd is valid and usage tips should be printed, false: otherwise
stdout_file=$1
should_contain=$2
contain=
set +e

## check stdout
if [ ! -f "$stdout_file" ]; then
echo "stdout log file does not exist"
exit 0
fi

grep -q 'Usage:' "$stdout_file"
if [[ $? -eq 0 ]]; then
contain="false"
else
contain="true"
fi

if [[ "$contain" == "$should_contain" ]]; then
exit 0
else
echo "Check Usage Tips Failed!"
exit 1
fi
11 changes: 11 additions & 0 deletions tests/integration_tests/_utils/run_cdc_server
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ failpoint=$GO_FAILPOINTS
config_path=
data_dir=
curl_status_cmd=
supposed_to_fail="no"

while [[ ${1} ]]; do
case "${1}" in
Expand Down Expand Up @@ -77,6 +78,10 @@ while [[ ${1} ]]; do
config_path="--config ${2}"
shift
;;
--supposed-to-fail)
supposed_to_fail="${2}"
shift
;;
--data-dir)
data_dir=${2}
shift
Expand Down Expand Up @@ -151,6 +156,12 @@ else
curl_status_cmd="curl --cacert $tls_dir/ca.pem --cert $tls_dir/$certcn_name.pem --key $tls_dir/$certcn_name-key.pem -vsL --max-time 20 https://$addr_url/debug/info"
fi

# If the command is supposed to fail (in check usage tips test), just exit without retry
if [[ "$supposed_to_fail" != "no" ]]; then
set +x
exit 0
fi

for ((i = 0; i <= 50; i++)); do
res=$($curl_status_cmd)
# Make sure we get the capture info(etcd_info_msg) and that there are no errors(get_info_fail_msg).
Expand Down
91 changes: 91 additions & 0 deletions tests/integration_tests/cdc_server_tips/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
stdout_file=$WORK_DIR/stdout.log
cdc_launched=

function prepare_tidb_cluster() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

run_sql "CREATE table test.simple1(id int primary key, val int);"
}

function try_to_run_cdc() {
if [[ $1 == "valid" ]]; then
echo "try a VALID cdc server command"
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
else
echo "try an INVALID cdc server command"
run_cdc_server --supposed-to-fail "true" --workdir $WORK_DIR --binary $CDC_BINARY --pd "None"
fi

#Wait the failed cdc to quit
sleep 20
echo $1" ~~~ running cdc " "$(ps -a | grep 'cdc')"

if [[ $(ps -a | grep "cdc.test") == "" ]]; then
cdc_launched="false"
echo 'Failed to start cdc, the usage tips should be printed'
return 0
fi

cdc_launched="true"
echo 'Succeed to run cdc, now create a changefeed, no usage tips should be printed'
echo "pid"$(ps -a | grep "cdc.test")

TOPIC_NAME="ticdc-server-tips-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka+ssl://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-client-id=cdc_server_tips&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql+ssl://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
echo 'Succeed to create a changefeed, no usage tips should be printed'
}

stop_cdc() {
echo "Later, cdc will receive a signal(SIGINT) and exit"
cdc_pid=$(ps -a | grep -m 1 "cdc.test" | awk '{print $1}')
echo "cdc pid is "$cdc_pid
sleep 60
kill -2 $cdc_pid
sleep 30
}

trap stop_tidb_cluster EXIT
prepare_tidb_cluster

# If cdc gets started normally, no usage tips should be printed when exit
try_to_run_cdc "valid"
if [[ "$cdc_launched" == "true" ]]; then
# If the cdc was launched, send a signal to stop it and check stdout
stop_cdc
check_usage_tips "$stdout_file" "true"
fi
echo " 1st test case $TEST_NAME success! "

# invalid command and should print usage tips
try_to_run_cdc "invalid"
if [[ "$cdc_launched" == "false" ]]; then
check_usage_tips "$stdout_file" "false"
else
echo "CDC should not get started with invalid argument"
exit 1
fi
echo " 2nd test case $TEST_NAME success! "

echo "[$(date)] <<<<<< run all test cases $TEST_NAME success! >>>>>> "

0 comments on commit 9bff5d2

Please sign in to comment.