Skip to content

Commit

Permalink
refactor: remove retry attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
DarthPestilane committed Jan 30, 2024
1 parent af8cc9e commit b0acfa3
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 64 deletions.
24 changes: 3 additions & 21 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type Server struct {
printRoutes bool
accepting chan struct{}
stopped chan struct{}
writeAttemptTimes int
asyncRouter bool
}

Expand All @@ -51,10 +50,6 @@ type ServerOption struct {
RespQueueSize int // sets the response channel size of session, DefaultRespQueueSize will be used if < 0.
DoNotPrintRoutes bool // whether to print registered route handlers to the console.

// WriteAttemptTimes sets the max attempt times for packet writing in each session.
// The DefaultWriteAttemptTimes will be used if <= 0.
WriteAttemptTimes int

// AsyncRouter represents whether to execute a route HandlerFunc of each session in a goroutine.
// true means execute in a goroutine.
AsyncRouter bool
Expand All @@ -63,11 +58,7 @@ type ServerOption struct {
// ErrServerStopped is returned when server stopped.
var ErrServerStopped = fmt.Errorf("server stopped")

const (
DefaultRespQueueSize = 1024
DefaultWriteAttemptTimes = 1
tempErrDelay = time.Millisecond * 5
)
const DefaultRespQueueSize = 1024

// NewServer creates a Server according to opt.
func NewServer(opt *ServerOption) *Server {
Expand All @@ -77,9 +68,6 @@ func NewServer(opt *ServerOption) *Server {
if opt.RespQueueSize < 0 {
opt.RespQueueSize = DefaultRespQueueSize
}
if opt.WriteAttemptTimes <= 0 {
opt.WriteAttemptTimes = DefaultWriteAttemptTimes
}
return &Server{
socketReadBufferSize: opt.SocketReadBufferSize,
socketWriteBufferSize: opt.SocketWriteBufferSize,
Expand All @@ -93,7 +81,6 @@ func NewServer(opt *ServerOption) *Server {
router: newRouter(),
accepting: make(chan struct{}),
stopped: make(chan struct{}),
writeAttemptTimes: opt.WriteAttemptTimes,
asyncRouter: opt.AsyncRouter,
}
}
Expand Down Expand Up @@ -142,11 +129,6 @@ func (s *Server) acceptLoop() error {
Log.Tracef("server accept loop stopped")
return ErrServerStopped
}
if ne, ok := err.(net.Error); ok && !ne.Timeout() {
Log.Errorf("accept err: %s; retrying in %s", err, tempErrDelay)
time.Sleep(tempErrDelay)
continue
}
return fmt.Errorf("accept err: %s", err)
}
if s.socketReadBufferSize > 0 {
Expand Down Expand Up @@ -191,8 +173,8 @@ func (s *Server) handleConn(conn net.Conn) {
}
close(sess.afterCreateHook)

go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(s.writeTimeout, s.writeAttemptTimes) // start writing message packet to connection.
go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(s.writeTimeout) // start writing message packet to connection.

select {
case <-sess.closed: // wait for session finished.
Expand Down
27 changes: 2 additions & 25 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s *session) handleReq(router *Router, reqMsg *Message) {
// writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop.
// Parameter writeTimeout specified the connection writing timeout.
// The loop breaks if errors occurred, or the session is closed.
func (s *session) writeOutbound(writeTimeout time.Duration, attemptTimes int) {
func (s *session) writeOutbound(writeTimeout time.Duration) {
for {
var ctx Context
select {
Expand All @@ -212,7 +212,7 @@ func (s *session) writeOutbound(writeTimeout time.Duration, attemptTimes int) {
}
}

if err := s.attemptConnWrite(outboundBytes, attemptTimes); err != nil {
if _, err := s.conn.Write(outboundBytes); err != nil {
Log.Errorf("session %s conn write err: %s", s.id, err)
break
}
Expand All @@ -221,29 +221,6 @@ func (s *session) writeOutbound(writeTimeout time.Duration, attemptTimes int) {
Log.Tracef("session %s writeOutbound exit because of error", s.id)
}

func (s *session) attemptConnWrite(outboundMsg []byte, attemptTimes int) (err error) {
for i := 0; i < attemptTimes; i++ {
time.Sleep(tempErrDelay * time.Duration(i))
_, err = s.conn.Write(outboundMsg)

// breaks if err is not nil, or it's the last attempt.
if err == nil || i == attemptTimes-1 {
break
}

// check if err is `net.Error`
ne, ok := err.(net.Error)
if !ok {
break
}
if ne.Timeout() {
break
}
Log.Errorf("session %s conn write err: %s; retrying in %s", s.id, err, tempErrDelay*time.Duration(i+1))
}
return
}

func (s *session) packResponse(ctx Context) ([]byte, error) {
defer s.ctxPool.Put(ctx)
if ctx.Response() == nil {
Expand Down
26 changes: 8 additions & 18 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
doneLoop := make(chan struct{})
sess.Close()
go func() {
sess.writeOutbound(0, 10) // should stop looping and return
sess.writeOutbound(0) // should stop looping and return
close(doneLoop)
}()
time.Sleep(time.Millisecond * 5)
Expand All @@ -211,7 +211,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
sess.respQueue <- sess.AllocateContext()
doneLoop := make(chan struct{})
go func() {
sess.writeOutbound(0, 10) // should stop looping and return
sess.writeOutbound(0) // should stop looping and return
close(doneLoop)
}()
time.Sleep(time.Millisecond * 5)
Expand All @@ -232,7 +232,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
close(done)
}()
time.Sleep(time.Microsecond * 15)
go sess.writeOutbound(0, 10)
go sess.writeOutbound(0)
time.Sleep(time.Millisecond * 15)
<-done
sess.Close() // should break the write loop
Expand All @@ -248,7 +248,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) // push to queue
doneLoop := make(chan struct{})
go func() {
sess.writeOutbound(0, 10)
sess.writeOutbound(0)
close(doneLoop)
}()
time.Sleep(time.Millisecond * 5)
Expand All @@ -265,7 +265,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
_ = p1.Close()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }()
go sess.writeOutbound(time.Millisecond*10, 10)
go sess.writeOutbound(time.Millisecond * 10)
_, ok := <-sess.closed
assert.False(t, ok)
})
Expand All @@ -279,7 +279,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
p1, _ := net.Pipe()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }()
go sess.writeOutbound(time.Millisecond*10, 10)
go sess.writeOutbound(time.Millisecond * 10)
_, ok := <-sess.closed
assert.False(t, ok)
_ = p1.Close()
Expand All @@ -295,7 +295,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
assert.NoError(t, p1.Close())
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }()
sess.writeOutbound(0, 10) // should stop looping and return
sess.writeOutbound(0) // should stop looping and return
_, ok := <-sess.closed
assert.False(t, ok)
})
Expand All @@ -311,7 +311,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
go func() { sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))).Send() }()
done := make(chan struct{})
go func() {
sess.writeOutbound(0, 10)
sess.writeOutbound(0)
close(done)
}()
time.Sleep(time.Millisecond * 5)
Expand All @@ -321,16 +321,6 @@ func TestTCPSession_writeOutbound(t *testing.T) {
})
}

func TestSession_attemptConnWrite_when_reach_last_try(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
conn := mock.NewMockConn(ctrl)
conn.EXPECT().Write(gomock.Any()).Return(0, fmt.Errorf("some err"))

s := newSession(conn, &sessionOption{})
assert.Error(t, s.attemptConnWrite([]byte("whatever"), 1))
}

func Test_session_SetID(t *testing.T) {
sess := newSession(nil, &sessionOption{})
_, ok := sess.ID().(string)
Expand Down

0 comments on commit b0acfa3

Please sign in to comment.