Skip to content

Commit

Permalink
Mod: don't use read queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fangyincheng committed Jul 21, 2019
1 parent cd6ab9c commit 058d652
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 39 deletions.
1 change: 0 additions & 1 deletion getty.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ type Session interface {
SetReader(Reader)
SetWriter(Writer)
SetCronPeriod(int)
SetRQLen(int)
SetWQLen(int)
SetWaitTime(time.Duration)
SetTaskPool(*TaskPool)
Expand Down
46 changes: 8 additions & 38 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ type session struct {
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer

// read & write
rQ chan interface{}
// write
wQ chan interface{}

// handle logic
Expand Down Expand Up @@ -274,18 +273,6 @@ func (s *session) SetCronPeriod(period int) {
s.period = time.Duration(period) * time.Millisecond
}

// set @session's read queue size
func (s *session) SetRQLen(readQLen int) {
if readQLen < 1 {
panic("@readQLen < 1")
}

s.lock.Lock()
defer s.lock.Unlock()
s.rQ = make(chan interface{}, readQLen)
log.Debug("%s, [session.SetRQLen] rQ{len:%d, cap:%d}", s.Stat(), len(s.rQ), cap(s.rQ))
}

// set @session's Write queue size
func (s *session) SetWQLen(writeQLen int) {
if writeQLen < 1 {
Expand Down Expand Up @@ -465,10 +452,6 @@ func (s *session) run() {
s.wQ = make(chan interface{}, defaultQLen)
}

if s.rQ == nil && s.tPool == nil {
s.rQ = make(chan interface{}, defaultQLen)
}

// call session opened
s.UpdateActive()
if err := s.listener.OnOpen(s); err != nil {
Expand All @@ -491,7 +474,6 @@ func (s *session) handleLoop() {
wsConn *gettyWSConn
// start time.Time
counter gstime.CountWatch
inPkg interface{}
outPkg interface{}
)

Expand Down Expand Up @@ -521,7 +503,7 @@ LOOP:
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(s.rQ) == 0 && len(s.wQ) == 0 {
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. Both rQ and wQ are nil.", s.Stat())
break LOOP
}
Expand All @@ -533,18 +515,6 @@ LOOP:
}
}

case inPkg = <-s.rQ:
// read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ.
if flag {
log.Debugf("%#v <-s.rQ", inPkg)
pkg := inPkg
// go s.listener.OnMessage(s, pkg)
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
} else {
log.Infof("[session.handleLoop] drop readin package{%#v}", inPkg)
}

case outPkg = <-s.wQ:
if flag {
if err = s.writer.Write(s, outPkg); err != nil {
Expand Down Expand Up @@ -572,11 +542,15 @@ LOOP:
}

func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if s.tPool != nil {
s.tPool.AddTask(task{session: s, pkg: pkg})
} else {
s.rQ <- pkg
return
}
f()
}

func (s *session) handlePackage() {
Expand Down Expand Up @@ -849,10 +823,6 @@ func (s *session) gc() {
close(s.wQ)
s.wQ = nil
}
if s.rQ != nil {
close(s.rQ)
s.rQ = nil
}
s.Connection.close((int)((int64)(s.wait)))
}
s.lock.Unlock()
Expand Down

0 comments on commit 058d652

Please sign in to comment.