Skip to content

Commit

Permalink
mux: add SmuxMaxAliveDuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Sep 16, 2022
1 parent 6965394 commit 30159c4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
5 changes: 3 additions & 2 deletions internal/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ var (
const (
DialTimeOut = 3 * time.Second

SmuxGCDuration = 30 * time.Second
MaxMuxStreamCnt = 50
SmuxGCDuration = 30 * time.Second
SmuxMaxAliveDuration = 1 * time.Hour
SmuxMaxStreamCnt = 50

Listen_RAW = "raw"
Listen_WS = "ws"
Expand Down
41 changes: 28 additions & 13 deletions internal/transporter/smux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,30 @@ type smuxTransporter struct {

gcTicker *time.Ticker
L *zap.SugaredLogger
sessionM map[string][]*smux.Session

// remote addr -> SessionWithMetrics
sessionM map[string][]*SessionWithMetrics

initSessionF func(ctx context.Context, addr string) (*smux.Session, error)
}

type SessionWithMetrics struct {
session *smux.Session
createdTime time.Time
}

func (sm *SessionWithMetrics) CanNotServe() bool {
return sm.session.IsClosed() ||
sm.session.NumStreams() >= constant.SmuxMaxStreamCnt ||
time.Now().Sub(sm.createdTime) > constant.SmuxMaxAliveDuration
}

func NewSmuxTransporter(l *zap.SugaredLogger,
initSessionF func(ctx context.Context, addr string) (*smux.Session, error)) *smuxTransporter {
tr := &smuxTransporter{
L: l,
initSessionF: initSessionF,
sessionM: make(map[string][]*smux.Session),
sessionM: make(map[string][]*SessionWithMetrics),
gcTicker: time.NewTicker(constant.SmuxGCDuration),
}
// start gc thread for close idle sessions
Expand All @@ -40,15 +53,16 @@ func (tr *smuxTransporter) gc() {
for addr, sl := range tr.sessionM {
tr.L.Debugf("==== start doing gc for remote addr: %s total session count %d ====", addr, len(sl))
for idx := range sl {
tr.L.Debugf("check session: %s current stream count %d", sl[idx].LocalAddr().String(), sl[idx].NumStreams())
if sl[idx].NumStreams() == 0 {
sl[idx].Close()
tr.L.Debugf("close idle session:%s", sl[idx].LocalAddr().String())
sm := sl[idx]
tr.L.Debugf("check session: %s current stream count %d", sm.session.LocalAddr().String(), sm.session.NumStreams())
if sm.session.NumStreams() == 0 {
sm.session.Close()
tr.L.Debugf("close idle session:%s", sm.session.LocalAddr().String())
}
}
newList := []*smux.Session{}
newList := []*SessionWithMetrics{}
for _, s := range sl {
if !s.IsClosed() {
if !s.session.IsClosed() {
newList = append(newList, s)
}
}
Expand All @@ -63,13 +77,14 @@ func (tr *smuxTransporter) Dial(ctx context.Context, addr string) (conn net.Conn
tr.sessionMutex.Lock()
defer tr.sessionMutex.Unlock()
var session *smux.Session

sessionList := tr.sessionM[addr]
for _, s := range sessionList {
if s.IsClosed() || s.NumStreams() >= constant.MaxMuxStreamCnt {
for _, sm := range sessionList {
if sm.CanNotServe() {
continue
} else {
tr.L.Debugf("use session: %s total stream count: %d", s.RemoteAddr().String(), s.NumStreams())
session = s
tr.L.Debugf("use session: %s total stream count: %d", sm.session.RemoteAddr().String(), sm.session.NumStreams())
session = sm.session
break
}
}
Expand All @@ -80,7 +95,7 @@ func (tr *smuxTransporter) Dial(ctx context.Context, addr string) (conn net.Conn
if err != nil {
return nil, err
}
sessionList = append(sessionList, session)
sessionList = append(sessionList, &SessionWithMetrics{session: session, createdTime: time.Now()})
tr.sessionM[addr] = sessionList
}

Expand Down

0 comments on commit 30159c4

Please sign in to comment.