Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
simplify reuse gc
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jul 17, 2021
1 parent dba9ae3 commit 1dffa32
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 73 deletions.
40 changes: 12 additions & 28 deletions reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,27 @@ func (c *reuseConn) ShouldGarbageCollect(now time.Time) bool {
type reuse struct {
mutex sync.Mutex

garbageCollectorRunning bool

closeChan chan struct{}
garbageCollectorStopChan chan struct{}
closeChan chan struct{}
gcStopChan chan struct{}

unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn
// global contains connections that are listening on 0.0.0.0 / ::
global map[int]*reuseConn
}

func newReuse() *reuse {
return &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
r := &reuse{
unicast: make(map[string]map[int]*reuseConn),
global: make(map[int]*reuseConn),
closeChan: make(chan struct{}),
gcStopChan: make(chan struct{}),
}
go r.gc()
return r
}

func (r *reuse) runGarbageCollector() {
defer close(r.garbageCollectorStopChan)
func (r *reuse) gc() {
defer close(r.gcStopChan)
ticker := time.NewTicker(garbageCollectInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -101,7 +102,6 @@ func (r *reuse) runGarbageCollector() {

// stop the garbage collector if we're not tracking any connections
if len(r.global) == 0 && len(r.unicast) == 0 {
r.garbageCollectorRunning = false
shouldExit = true
}
r.mutex.Unlock()
Expand All @@ -113,14 +113,6 @@ func (r *reuse) runGarbageCollector() {
}
}

// must be called while holding the mutex
func (r *reuse) maybeStartGarbageCollector() {
if !r.garbageCollectorRunning {
r.garbageCollectorRunning = true
r.garbageCollectorStopChan = make(chan struct{})
go r.runGarbageCollector()
}
}
func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
var ip *net.IP
if router, err := netroute.New(); err == nil {
Expand All @@ -138,7 +130,6 @@ func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) {
return nil, err
}
conn.IncreaseCount()
r.maybeStartGarbageCollector()
return conn, nil
}

Expand Down Expand Up @@ -190,8 +181,6 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

r.maybeStartGarbageCollector()

// Deal with listen on a global address
if localAddr.IP.IsUnspecified() {
// The kernel already checked that the laddr is not already listen
Expand All @@ -212,12 +201,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) {
}

func (r *reuse) Close() error {
r.mutex.Lock()
defer r.mutex.Unlock()
close(r.closeChan)
if r.garbageCollectorRunning {
<-r.garbageCollectorStopChan
r.garbageCollectorRunning = false
}
<-r.gcStopChan
return nil
}
69 changes: 24 additions & 45 deletions reuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ var _ = Describe("Reuse", func() {
})

AfterEach(func() {
isGarbageCollectorRunning := func() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).gc")
}

Expect(reuse.Close()).To(Succeed())
Expect(isGarbageCollectorRunning()).To(BeFalse())
})

isGarbageCollectorRunning := func() bool {
var b bytes.Buffer
pprof.Lookup("goroutine").WriteTo(&b, 1)
return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).runGarbageCollector")
}

Context("creating and reusing connections", func() {
AfterEach(func() { closeAllConns(reuse) })

Expand Down Expand Up @@ -126,54 +127,32 @@ var _ = Describe("Reuse", func() {
})
})

Context("garbage-collecting connections", func() {
It("garbage collects connections once they're not used any more for a certain time", func() {
numGlobals := func() int {
reuse.mutex.Lock()
defer reuse.mutex.Unlock()
return len(reuse.global)
}

BeforeEach(func() {
maxUnusedDuration = 100 * time.Millisecond
})
maxUnusedDuration = 100 * time.Millisecond

It("garbage collects connections once they're not used any more for a certain time", func() {
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))
addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
lconn, err := reuse.Listen("udp4", addr)
Expect(err).ToNot(HaveOccurred())
Expect(lconn.GetCount()).To(Equal(1))

closeTime := time.Now()
lconn.DecreaseCount()
closeTime := time.Now()
lconn.DecreaseCount()

for {
num := numGlobals()
if closeTime.Add(maxUnusedDuration).Before(time.Now()) {
break
}
Expect(num).To(Equal(1))
time.Sleep(2 * time.Millisecond)
for {
num := numGlobals()
if closeTime.Add(maxUnusedDuration).Before(time.Now()) {
break
}
Eventually(numGlobals).Should(BeZero())
})

It("only stops the garbage collector when there are no more connections", func() {
addr1, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
conn1, err := reuse.Listen("udp4", addr1)
Expect(err).ToNot(HaveOccurred())

addr2, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expect(err).ToNot(HaveOccurred())
conn2, err := reuse.Listen("udp4", addr2)
Expect(err).ToNot(HaveOccurred())

Eventually(isGarbageCollectorRunning).Should(BeTrue())
conn1.DecreaseCount()
Consistently(isGarbageCollectorRunning, 2*maxUnusedDuration).Should(BeTrue())
conn2.DecreaseCount()
Eventually(isGarbageCollectorRunning, 2*maxUnusedDuration).Should(BeFalse())
})
Expect(num).To(Equal(1))
time.Sleep(2 * time.Millisecond)
}
Eventually(numGlobals).Should(BeZero())
})
})

0 comments on commit 1dffa32

Please sign in to comment.