diff --git a/server.go b/server.go index a217b58..5538101 100644 --- a/server.go +++ b/server.go @@ -35,7 +35,6 @@ type Server struct { printRoutes bool accepting chan struct{} stopped chan struct{} - writeAttemptTimes int asyncRouter bool } @@ -51,10 +50,6 @@ type ServerOption struct { RespQueueSize int // sets the response channel size of session, DefaultRespQueueSize will be used if < 0. DoNotPrintRoutes bool // whether to print registered route handlers to the console. - // WriteAttemptTimes sets the max attempt times for packet writing in each session. - // The DefaultWriteAttemptTimes will be used if <= 0. - WriteAttemptTimes int - // AsyncRouter represents whether to execute a route HandlerFunc of each session in a goroutine. // true means execute in a goroutine. AsyncRouter bool @@ -63,11 +58,7 @@ type ServerOption struct { // ErrServerStopped is returned when server stopped. var ErrServerStopped = fmt.Errorf("server stopped") -const ( - DefaultRespQueueSize = 1024 - DefaultWriteAttemptTimes = 1 - tempErrDelay = time.Millisecond * 5 -) +const DefaultRespQueueSize = 1024 // NewServer creates a Server according to opt. func NewServer(opt *ServerOption) *Server { @@ -77,9 +68,6 @@ func NewServer(opt *ServerOption) *Server { if opt.RespQueueSize < 0 { opt.RespQueueSize = DefaultRespQueueSize } - if opt.WriteAttemptTimes <= 0 { - opt.WriteAttemptTimes = DefaultWriteAttemptTimes - } return &Server{ socketReadBufferSize: opt.SocketReadBufferSize, socketWriteBufferSize: opt.SocketWriteBufferSize, @@ -93,7 +81,6 @@ func NewServer(opt *ServerOption) *Server { router: newRouter(), accepting: make(chan struct{}), stopped: make(chan struct{}), - writeAttemptTimes: opt.WriteAttemptTimes, asyncRouter: opt.AsyncRouter, } } @@ -142,11 +129,6 @@ func (s *Server) acceptLoop() error { Log.Tracef("server accept loop stopped") return ErrServerStopped } - if ne, ok := err.(net.Error); ok && !ne.Timeout() { - Log.Errorf("accept err: %s; retrying in %s", err, tempErrDelay) - time.Sleep(tempErrDelay) - continue - } return fmt.Errorf("accept err: %s", err) } if s.socketReadBufferSize > 0 { @@ -191,8 +173,8 @@ func (s *Server) handleConn(conn net.Conn) { } close(sess.afterCreateHook) - go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection. - go sess.writeOutbound(s.writeTimeout, s.writeAttemptTimes) // start writing message packet to connection. + go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection. + go sess.writeOutbound(s.writeTimeout) // start writing message packet to connection. select { case <-sess.closed: // wait for session finished. diff --git a/session.go b/session.go index a079de5..00d8065 100644 --- a/session.go +++ b/session.go @@ -187,7 +187,7 @@ func (s *session) handleReq(router *Router, reqMsg *Message) { // writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop. // Parameter writeTimeout specified the connection writing timeout. // The loop breaks if errors occurred, or the session is closed. -func (s *session) writeOutbound(writeTimeout time.Duration, attemptTimes int) { +func (s *session) writeOutbound(writeTimeout time.Duration) { for { var ctx Context select { @@ -212,7 +212,7 @@ func (s *session) writeOutbound(writeTimeout time.Duration, attemptTimes int) { } } - if err := s.attemptConnWrite(outboundBytes, attemptTimes); err != nil { + if _, err := s.conn.Write(outboundBytes); err != nil { Log.Errorf("session %s conn write err: %s", s.id, err) break } @@ -221,29 +221,6 @@ func (s *session) writeOutbound(writeTimeout time.Duration, attemptTimes int) { Log.Tracef("session %s writeOutbound exit because of error", s.id) } -func (s *session) attemptConnWrite(outboundMsg []byte, attemptTimes int) (err error) { - for i := 0; i < attemptTimes; i++ { - time.Sleep(tempErrDelay * time.Duration(i)) - _, err = s.conn.Write(outboundMsg) - - // breaks if err is not nil, or it's the last attempt. - if err == nil || i == attemptTimes-1 { - break - } - - // check if err is `net.Error` - ne, ok := err.(net.Error) - if !ok { - break - } - if ne.Timeout() { - break - } - Log.Errorf("session %s conn write err: %s; retrying in %s", s.id, err, tempErrDelay*time.Duration(i+1)) - } - return -} - func (s *session) packResponse(ctx Context) ([]byte, error) { defer s.ctxPool.Put(ctx) if ctx.Response() == nil { diff --git a/session_test.go b/session_test.go index 655fe83..dc5d1d2 100644 --- a/session_test.go +++ b/session_test.go @@ -194,7 +194,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { doneLoop := make(chan struct{}) sess.Close() go func() { - sess.writeOutbound(0, 10) // should stop looping and return + sess.writeOutbound(0) // should stop looping and return close(doneLoop) }() time.Sleep(time.Millisecond * 5) @@ -211,7 +211,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { sess.respQueue <- sess.AllocateContext() doneLoop := make(chan struct{}) go func() { - sess.writeOutbound(0, 10) // should stop looping and return + sess.writeOutbound(0) // should stop looping and return close(doneLoop) }() time.Sleep(time.Millisecond * 5) @@ -232,7 +232,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { close(done) }() time.Sleep(time.Microsecond * 15) - go sess.writeOutbound(0, 10) + go sess.writeOutbound(0) time.Sleep(time.Millisecond * 15) <-done sess.Close() // should break the write loop @@ -248,7 +248,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) // push to queue doneLoop := make(chan struct{}) go func() { - sess.writeOutbound(0, 10) + sess.writeOutbound(0) close(doneLoop) }() time.Sleep(time.Millisecond * 5) @@ -265,7 +265,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { _ = p1.Close() sess := newSession(p1, &sessionOption{Packer: packer}) go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() - go sess.writeOutbound(time.Millisecond*10, 10) + go sess.writeOutbound(time.Millisecond * 10) _, ok := <-sess.closed assert.False(t, ok) }) @@ -279,7 +279,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { p1, _ := net.Pipe() sess := newSession(p1, &sessionOption{Packer: packer}) go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() - go sess.writeOutbound(time.Millisecond*10, 10) + go sess.writeOutbound(time.Millisecond * 10) _, ok := <-sess.closed assert.False(t, ok) _ = p1.Close() @@ -295,7 +295,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { assert.NoError(t, p1.Close()) sess := newSession(p1, &sessionOption{Packer: packer}) go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))) }() - sess.writeOutbound(0, 10) // should stop looping and return + sess.writeOutbound(0) // should stop looping and return _, ok := <-sess.closed assert.False(t, ok) }) @@ -311,7 +311,7 @@ func TestTCPSession_writeOutbound(t *testing.T) { go func() { sess.AllocateContext().SetResponseMessage(NewMessage(1, []byte("test"))).Send() }() done := make(chan struct{}) go func() { - sess.writeOutbound(0, 10) + sess.writeOutbound(0) close(done) }() time.Sleep(time.Millisecond * 5) @@ -321,16 +321,6 @@ func TestTCPSession_writeOutbound(t *testing.T) { }) } -func TestSession_attemptConnWrite_when_reach_last_try(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - conn := mock.NewMockConn(ctrl) - conn.EXPECT().Write(gomock.Any()).Return(0, fmt.Errorf("some err")) - - s := newSession(conn, &sessionOption{}) - assert.Error(t, s.attemptConnWrite([]byte("whatever"), 1)) -} - func Test_session_SetID(t *testing.T) { sess := newSession(nil, &sessionOption{}) _, ok := sess.ID().(string)