Skip to content

Commit

Permalink
add more status (finished\failed) in the cvs task (pingcap#144)
Browse files Browse the repository at this point in the history
* add demotask server and client

* remove the temp data files

* fix the make check error

* fix the some errors case

* add the cvs task configure infomation

* add the cvs task configure information

* fix the make check error

* add  the sync progress logic

* refactor the data reader/writer server

* replace the fmt.print with the log and add implement the failover of the cvs jobmaster

* fix  a bug about scan string in kvdatamake.go

* fix the bug in the kvdatamake.go

* remove the blank line in the kvdatamake.go

* refine the cvs task and jobmaster

* remove the cvs jobmaster to the cvs job folder

* fix some bug in the cvs task and cvs jobmaster

* remove useless file

* revert some fixs

* cancel the send method if reach the end of the file

* resolved some review comment

* resolved some review comment

* format the code style

* remove the temp file

* add some logs and fix bugs in the cvs jobmaster

* add the finished case when reading file

* add the pb.go file

Co-authored-by: Zixiong Liu <liuzixiong@pingcap.com>
Co-authored-by: amyangfei <amyangfei@gmail.com>
  • Loading branch information
3 people authored Feb 9, 2022
1 parent bdd71c1 commit 8c87852
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 34 deletions.
6 changes: 4 additions & 2 deletions cmd/demoserver/kvdatamake.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,10 @@ func (s *DataRWServer) ReadLines(req *pb.ReadLinesRequest, stream pb.DataRWServi
default:
reply, err := reader.ReadString('\n')
if err == io.EOF {
fmt.Printf("reach the end of the file ")
err = stream.Send(&pb.ReadLinesResponse{Linestr: "", IsEof: true})
log.L().Info("reach the end of the file ")
break
return err
}
if i < int(req.LineNo) {
continue
Expand All @@ -227,7 +229,7 @@ func (s *DataRWServer) ReadLines(req *pb.ReadLinesRequest, stream pb.DataRWServi
if reply == "" {
continue
}
err = stream.Send(&pb.ReadLinesResponse{Linestr: reply})
err = stream.Send(&pb.ReadLinesResponse{Linestr: reply, IsEof: false})
i++
if err != nil {
return err
Expand Down
33 changes: 25 additions & 8 deletions executor/cvsTask/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cvstask

import (
"context"
"io"
"strings"
"time"

Expand Down Expand Up @@ -44,6 +43,7 @@ type cvsTask struct {
status lib.WorkerStatusCode
cancelFn func()
buffer chan strPair
isEOF bool
}

func init() {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (task *cvsTask) Tick(ctx context.Context) error {

// Status returns a short worker status to be periodically sent to the master.
func (task *cvsTask) Status() lib.WorkerStatus {
return lib.WorkerStatus{Code: lib.WorkerStatusNormal, ErrorMessage: "", Ext: task.counter}
return lib.WorkerStatus{Code: task.status, ErrorMessage: "", Ext: task.counter}
}

// Workload returns the current workload of the worker.
Expand Down Expand Up @@ -139,16 +139,20 @@ func (task *cvsTask) Receive(ctx context.Context) error {
return err
}
for {
linestr, err := reader.Recv()
reply, err := reader.Recv()
if err != nil {
if err == io.EOF {
log.L().Info("read data failed", zap.Any("error:", err.Error()))
if !task.isEOF {
task.cancelFn()
break
}
log.L().Info("read data failed", zap.Any("error:", err.Error()))
continue
return err
}
if reply.IsEof {
log.L().Info("Reach the end of the file ", zap.Any("fileName:", task.srcDir))
task.isEOF = true
break
}
strs := strings.Split(linestr.Linestr, ",")
strs := strings.Split(reply.Linestr, ",")
if len(strs) < 2 {
continue
}
Expand All @@ -173,6 +177,8 @@ func (task *cvsTask) Send(ctx context.Context) error {
writer, err := client.WriteLines(ctx)
if err != nil {
log.L().Info("call write data rpc failed ")
task.status = lib.WorkerStatusError
task.cancelFn()
return err
}
for {
Expand All @@ -182,11 +188,22 @@ func (task *cvsTask) Send(ctx context.Context) error {
task.counter++
if err != nil {
log.L().Info("call write data rpc failed ")
task.status = lib.WorkerStatusError
task.cancelFn()
return err
}
case <-ctx.Done():
task.status = lib.WorkerStatusError
return nil
default:
if task.isEOF {
log.L().Info("Reach the end of the file ")
task.status = lib.WorkerStatusFinished
err = writer.CloseSend()
return err
}
time.Sleep(time.Second)

}
}
}
4 changes: 3 additions & 1 deletion jobmaster/cvsJob/cvsJobMaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ func (jm *JobMaster) Tick(ctx context.Context) error {
jm.counter += int64(num)
// todo : store the sync progress into the meta store for each file
}
} else {
} else if status.Code == lib.WorkerStatusFinished {
// todo : handle error case here
log.L().Info("sync file finished ", zap.Any("message", worker.file))
} else if status.Code == lib.WorkerStatusError {
log.L().Info("sync file failed ", zap.Any("message", worker.file))
}
}
Expand Down
88 changes: 65 additions & 23 deletions pb/datarw.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/datarw.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message ReadLinesRequest {

message ReadLinesResponse {
string linestr = 1 ;
bool isEof = 2;
}

message WriteLinesRequest {
Expand Down

0 comments on commit 8c87852

Please sign in to comment.