Skip to content

Commit

Permalink
processor: fix error chan buffer full will block the whole server (#1309
Browse files Browse the repository at this point in the history
) (#1326)
  • Loading branch information
ti-srebot authored Jan 26, 2021
1 parent eb66cf3 commit 1b887a1
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 38 deletions.
67 changes: 29 additions & 38 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,7 @@ func (p *processor) Run(ctx context.Context) {

go func() {
if err := wg.Wait(); err != nil {
select {
case p.errCh <- err:
default:
}
p.sendError(err)
}
}()
}
Expand Down Expand Up @@ -804,7 +801,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
go func() {
err := plr.Run(ctx)
if errors.Cause(err) != context.Canceled {
p.errCh <- err
p.sendError(err)
}
}()

Expand All @@ -818,11 +815,11 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
if os.IsNotExist(errors.Cause(err)) {
err = os.MkdirAll(p.changefeed.SortDir, 0755)
if err != nil {
p.errCh <- errors.Annotate(cerror.WrapError(cerror.ErrProcessorSortDir, err), "create dir")
p.sendError(errors.Annotate(cerror.WrapError(cerror.ErrProcessorSortDir, err), "create dir"))
return nil
}
} else {
p.errCh <- errors.Annotate(cerror.WrapError(cerror.ErrProcessorSortDir, err), "sort dir check")
p.sendError(errors.Annotate(cerror.WrapError(cerror.ErrProcessorSortDir, err), "sort dir check"))
return nil
}
}
Expand All @@ -834,13 +831,13 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
sorter = psorter.NewUnifiedSorter(p.changefeed.SortDir, tableName, util.CaptureAddrFromCtx(ctx))
}
default:
p.errCh <- cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine)
p.sendError(cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine))
return nil
}
go func() {
err := sorter.Run(ctx)
if errors.Cause(err) != context.Canceled {
p.errCh <- err
p.sendError(err)
}
}()

Expand Down Expand Up @@ -879,7 +876,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
tableSink = startPuller(tableID, &table.resolvedTs, &table.checkpointTs)
table.cancel = func() {
cancel()
tableSink.Close()
if tableSink != nil {
tableSink.Close()
}
if mTableSink != nil {
mTableSink.Close()
}
Expand Down Expand Up @@ -916,7 +915,7 @@ func (p *processor) sorterConsume(
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
p.errCh <- ctx.Err()
p.sendError(ctx.Err())
}
return
case p.opDoneCh <- tableID:
Expand Down Expand Up @@ -985,7 +984,7 @@ func (p *processor) sorterConsume(
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
p.errCh <- ctx.Err()
p.sendError(ctx.Err())
}
return
case pEvent := <-sorter.Output():
Expand All @@ -997,7 +996,7 @@ func (p *processor) sorterConsume(
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
p.errCh <- ctx.Err()
p.sendError(ctx.Err())
}
return
case p.mounter.Input() <- pEvent:
Expand Down Expand Up @@ -1039,7 +1038,7 @@ func (p *processor) sorterConsume(
err := processRowChangedEvent(pEvent)
if err != nil {
if errors.Cause(err) != context.Canceled {
p.errCh <- errors.Trace(err)
p.sendError(ctx.Err())
}
return
}
Expand Down Expand Up @@ -1088,7 +1087,7 @@ func (p *processor) pullerConsume(
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
p.errCh <- ctx.Err()
p.sendError(ctx.Err())
}
return
case rawKV := <-plr.Output():
Expand Down Expand Up @@ -1157,7 +1156,9 @@ func runProcessor(
return nil, errors.Trace(err)
}
ctx, cancel := context.WithCancel(ctx)
errCh := make(chan error, 16)
// processor only receives one error from the channel, all producers to this
// channel must use the non-blocking way to send error.
errCh := make(chan error, 1)
s, err := sink.NewSink(ctx, changefeedID, info.SinkURI, filter, info.Config, opts, errCh)
if err != nil {
cancel()
Expand All @@ -1176,33 +1177,15 @@ func runProcessor(
processor.Run(ctx)

go func() {
var errs []error
appendError := func(err error) {
log.Debug("processor received error", zap.Error(err))
cause := errors.Cause(err)
if cause != nil && cause != context.Canceled && cerror.ErrAdminStopProcessor.NotEqual(cause) {
errs = append(errs, err)
}
}
err := <-errCh
appendError(err)
// sleep 500ms to wait all the errors are sent to errCh
time.Sleep(500 * time.Millisecond)
ReceiveErr:
for {
select {
case err := <-errCh:
appendError(err)
default:
break ReceiveErr
}
}
if len(errs) > 0 {
cause := errors.Cause(err)
if cause != nil && cause != context.Canceled && cerror.ErrAdminStopProcessor.NotEqual(cause) {
processorErrorCounter.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr).Inc()
log.Error("error on running processor",
util.ZapFieldCapture(ctx),
zap.String("changefeed", changefeedID),
zap.String("processor", processor.id),
zap.Errors("errors", errs))
zap.Error(err))
// record error information in etcd
var code string
if terror, ok := err.(*errors.Error); ok {
Expand Down Expand Up @@ -1230,3 +1213,11 @@ func runProcessor(

return processor, nil
}

func (p *processor) sendError(err error) {
select {
case p.errCh <- err:
default:
log.Error("processor receives redundant error", zap.Error(err))
}
}
2 changes: 2 additions & 0 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ func NewLocalFileSink(ctx context.Context, sinkURI *url.URL, errCh chan error) (
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func NewS3Sink(ctx context.Context, sinkURI *url.URL, errCh chan error) (*s3Sink
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func newMqSink(
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/producer/pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
)

// NewProducer create a pulsar producer.
Expand Down Expand Up @@ -83,6 +85,7 @@ func (p *Producer) errors(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err err
select {
case p.errCh <- cerror.WrapError(cerror.ErrPulsarSendMessage, err):
default:
log.Error("error channel is full", zap.Error(err))
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions tests/processor_err_chan/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "processor_err_chan"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
70 changes: 70 additions & 0 deletions tests/processor_err_chan/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function check_changefeed_mark_stopped() {
endpoints=$1
changefeedid=$2
error_msg=$3
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
echo "$info"
state=$(echo $info|jq -r '.state')
if [[ ! "$state" == "stopped" ]]; then
echo "changefeed state $state does not equal to stopped"
exit 1
fi
message=$(echo $info|jq -r '.error.message')
if [[ ! "$message" =~ "$error_msg" ]]; then
echo "error message '$message' is not as expected '$error_msg'"
exit 1
fi
}

export -f check_changefeed_mark_stopped

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
start_tidb_cluster --workdir $WORK_DIR
cd $WORK_DIR

pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1"
TOPIC_NAME="ticdc-processor-err-chan-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";;
esac
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi

run_sql "CREATE DATABASE processor_err_chan;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE processor_err_chan;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
for i in $(seq 1 10); do
run_sql "CREATE table processor_err_chan.t$i (id int primary key auto_increment)" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE table processor_err_chan.t$i (id int primary key auto_increment)" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
done

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" --sort-engine=abc-engine 2>&1|tail -n2|head -n1|awk '{print $2}')

retry_time=10
ensure $retry_time check_changefeed_mark_stopped $pd_addr $changefeed_id "[CDC:ErrUnknownSortEngine]unknown sort engine abc-engine"

cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI"
for i in $(seq 1 10); do
run_sql "INSERT INTO processor_err_chan.t$i values (),(),(),(),(),(),()" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
done
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 1b887a1

Please sign in to comment.