Skip to content

Commit

Permalink
Add any retry logic (not all)
Browse files Browse the repository at this point in the history
  • Loading branch information
asapozhkov committed Sep 5, 2023
1 parent d64b17a commit 60480d3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
2 changes: 1 addition & 1 deletion rpcx/file_service/file_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *Service) processRequests() {
slog.String("send_start_time", time.Now().Format(time.StampMilli)),
slog.String("filename", filename),
)
err := s.xClient.SendFile(context.Background(), filename, 0, nil) // TODO: context timeout?
err := s.xClient.SendFile(context.Background(), filename, 0, nil) // TODO: context timeout? // TODO: resend
if err != nil {
slog.Error(err.Error())
continue
Expand Down
35 changes: 22 additions & 13 deletions rpcx/incident_service/incident_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,29 @@ type LinkRequest struct {
}

type IncidentRequest struct {
ClientAddr string
IncidentID string
ClientAddr string
IncidentID string
handlingCount uint
}

type Response struct {
}

type Service struct {
incidentsQueue chan IncidentRequest
fileChunkSize uint
mx sync.RWMutex
wg sync.WaitGroup
clients map[string]client.XClient
incidentsQueue chan IncidentRequest
fileChunkSize uint
retriesOnFailure uint
mx sync.RWMutex
wg sync.WaitGroup
clients map[string]client.XClient
}

func NewService(reqLimit uint, chunkSize uint) (*Service, server.FileTransferHandler) {
func NewService(reqLimit, chunkSize, retriesOnFailure uint) (*Service, server.FileTransferHandler) {
s := &Service{
incidentsQueue: make(chan IncidentRequest, reqLimit),
fileChunkSize: chunkSize,
clients: make(map[string]client.XClient),
incidentsQueue: make(chan IncidentRequest, reqLimit),
fileChunkSize: chunkSize,
retriesOnFailure: retriesOnFailure,
clients: make(map[string]client.XClient),
}

go func() {
Expand Down Expand Up @@ -88,6 +91,10 @@ func (s *Service) Link(_ context.Context, arg LinkRequest, _ *Response) error {
}

func (s *Service) SendIncident(_ context.Context, arg IncidentRequest, _ *Response) error {
if arg.handlingCount > s.retriesOnFailure {
slog.Warn("incident retries limit has been reached", slog.Any("incident", arg))
return nil // TODO: ?????
}
slog.Debug("incident in pending", slog.Any("incident", arg))
s.incidentsQueue <- arg
return nil
Expand All @@ -109,7 +116,9 @@ func (s *Service) processIncidents() {
}
err := cl.Call(context.Background(), "RequestFile", args, &file_service.Response{}) // TODO: goroutine for waiting to recall?
if err != nil {
slog.Error(fmt.Sprintf("failed to call %s: %v", incident.ClientAddr, err)) // TODO: resend
slog.Error(fmt.Sprintf("failed to call %s: %v", incident.ClientAddr, err))
incident.handlingCount++
_ = s.SendIncident(context.Background(), incident, &Response{})
}
}
}
Expand Down Expand Up @@ -145,7 +154,7 @@ func (s *Service) saveFileHandler(conn net.Conn, args *share.FileTransferArgs) {
_, err = io.CopyBuffer(file, conn, make([]byte, s.fileChunkSize))
if err != nil {
slog.Error(err.Error())
return
return // TODO: resend
}
slog.Info("receive ok", slog.String("receive_end_time", time.Now().Format(time.StampMilli)))
}
9 changes: 5 additions & 4 deletions rpcx/smc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
)

const (
reqLimit = 1000
chunkSize = 1024
reqLimit = 1000
chunkSize = 1024
retriesOnFailure = 5
)

var (
Expand All @@ -28,8 +29,8 @@ func main() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, opts)).WithGroup("SMC"))

s := server.NewServer()
incidentService, saveFileHandler := incident_service.NewService(reqLimit, chunkSize)
p := server.NewFileTransfer(*addrFileTransfer, saveFileHandler, nil, 1000)
incidentService, saveFileHandler := incident_service.NewService(reqLimit, chunkSize, retriesOnFailure)
p := server.NewFileTransfer(*addrFileTransfer, saveFileHandler, nil, reqLimit)
s.EnableFileTransfer(share.SendFileServiceName, p)
err := s.RegisterName("IncidentService", incidentService, "")
if err != nil {
Expand Down

0 comments on commit 60480d3

Please sign in to comment.