From 1dffa3292a3bf44472afc54124900b5194fc3ee3 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 17 Jul 2021 11:12:42 +0200 Subject: [PATCH] simplify reuse gc --- reuse.go | 40 +++++++++-------------------- reuse_test.go | 69 ++++++++++++++++++--------------------------------- 2 files changed, 36 insertions(+), 73 deletions(-) diff --git a/reuse.go b/reuse.go index f8633a0..a408ff4 100644 --- a/reuse.go +++ b/reuse.go @@ -51,10 +51,8 @@ func (c *reuseConn) ShouldGarbageCollect(now time.Time) bool { type reuse struct { mutex sync.Mutex - garbageCollectorRunning bool - - closeChan chan struct{} - garbageCollectorStopChan chan struct{} + closeChan chan struct{} + gcStopChan chan struct{} unicast map[string] /* IP.String() */ map[int] /* port */ *reuseConn // global contains connections that are listening on 0.0.0.0 / :: @@ -62,15 +60,18 @@ type reuse struct { } func newReuse() *reuse { - return &reuse{ - unicast: make(map[string]map[int]*reuseConn), - global: make(map[int]*reuseConn), - closeChan: make(chan struct{}), + r := &reuse{ + unicast: make(map[string]map[int]*reuseConn), + global: make(map[int]*reuseConn), + closeChan: make(chan struct{}), + gcStopChan: make(chan struct{}), } + go r.gc() + return r } -func (r *reuse) runGarbageCollector() { - defer close(r.garbageCollectorStopChan) +func (r *reuse) gc() { + defer close(r.gcStopChan) ticker := time.NewTicker(garbageCollectInterval) defer ticker.Stop() @@ -101,7 +102,6 @@ func (r *reuse) runGarbageCollector() { // stop the garbage collector if we're not tracking any connections if len(r.global) == 0 && len(r.unicast) == 0 { - r.garbageCollectorRunning = false shouldExit = true } r.mutex.Unlock() @@ -113,14 +113,6 @@ func (r *reuse) runGarbageCollector() { } } -// must be called while holding the mutex -func (r *reuse) maybeStartGarbageCollector() { - if !r.garbageCollectorRunning { - r.garbageCollectorRunning = true - r.garbageCollectorStopChan = make(chan struct{}) - go r.runGarbageCollector() - } -} func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) { var ip *net.IP if router, err := netroute.New(); err == nil { @@ -138,7 +130,6 @@ func (r *reuse) Dial(network string, raddr *net.UDPAddr) (*reuseConn, error) { return nil, err } conn.IncreaseCount() - r.maybeStartGarbageCollector() return conn, nil } @@ -190,8 +181,6 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { r.mutex.Lock() defer r.mutex.Unlock() - r.maybeStartGarbageCollector() - // Deal with listen on a global address if localAddr.IP.IsUnspecified() { // The kernel already checked that the laddr is not already listen @@ -212,12 +201,7 @@ func (r *reuse) Listen(network string, laddr *net.UDPAddr) (*reuseConn, error) { } func (r *reuse) Close() error { - r.mutex.Lock() - defer r.mutex.Unlock() close(r.closeChan) - if r.garbageCollectorRunning { - <-r.garbageCollectorStopChan - r.garbageCollectorRunning = false - } + <-r.gcStopChan return nil } diff --git a/reuse_test.go b/reuse_test.go index e7436a0..b43d23d 100644 --- a/reuse_test.go +++ b/reuse_test.go @@ -51,15 +51,16 @@ var _ = Describe("Reuse", func() { }) AfterEach(func() { + isGarbageCollectorRunning := func() bool { + var b bytes.Buffer + pprof.Lookup("goroutine").WriteTo(&b, 1) + return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).gc") + } + Expect(reuse.Close()).To(Succeed()) + Expect(isGarbageCollectorRunning()).To(BeFalse()) }) - isGarbageCollectorRunning := func() bool { - var b bytes.Buffer - pprof.Lookup("goroutine").WriteTo(&b, 1) - return strings.Contains(b.String(), "go-libp2p-quic-transport.(*reuse).runGarbageCollector") - } - Context("creating and reusing connections", func() { AfterEach(func() { closeAllConns(reuse) }) @@ -126,54 +127,32 @@ var _ = Describe("Reuse", func() { }) }) - Context("garbage-collecting connections", func() { + It("garbage collects connections once they're not used any more for a certain time", func() { numGlobals := func() int { reuse.mutex.Lock() defer reuse.mutex.Unlock() return len(reuse.global) } - BeforeEach(func() { - maxUnusedDuration = 100 * time.Millisecond - }) + maxUnusedDuration = 100 * time.Millisecond - It("garbage collects connections once they're not used any more for a certain time", func() { - addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") - Expect(err).ToNot(HaveOccurred()) - lconn, err := reuse.Listen("udp4", addr) - Expect(err).ToNot(HaveOccurred()) - Expect(lconn.GetCount()).To(Equal(1)) + addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") + Expect(err).ToNot(HaveOccurred()) + lconn, err := reuse.Listen("udp4", addr) + Expect(err).ToNot(HaveOccurred()) + Expect(lconn.GetCount()).To(Equal(1)) - closeTime := time.Now() - lconn.DecreaseCount() + closeTime := time.Now() + lconn.DecreaseCount() - for { - num := numGlobals() - if closeTime.Add(maxUnusedDuration).Before(time.Now()) { - break - } - Expect(num).To(Equal(1)) - time.Sleep(2 * time.Millisecond) + for { + num := numGlobals() + if closeTime.Add(maxUnusedDuration).Before(time.Now()) { + break } - Eventually(numGlobals).Should(BeZero()) - }) - - It("only stops the garbage collector when there are no more connections", func() { - addr1, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") - Expect(err).ToNot(HaveOccurred()) - conn1, err := reuse.Listen("udp4", addr1) - Expect(err).ToNot(HaveOccurred()) - - addr2, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0") - Expect(err).ToNot(HaveOccurred()) - conn2, err := reuse.Listen("udp4", addr2) - Expect(err).ToNot(HaveOccurred()) - - Eventually(isGarbageCollectorRunning).Should(BeTrue()) - conn1.DecreaseCount() - Consistently(isGarbageCollectorRunning, 2*maxUnusedDuration).Should(BeTrue()) - conn2.DecreaseCount() - Eventually(isGarbageCollectorRunning, 2*maxUnusedDuration).Should(BeFalse()) - }) + Expect(num).To(Equal(1)) + time.Sleep(2 * time.Millisecond) + } + Eventually(numGlobals).Should(BeZero()) }) })