Skip to content

Commit

Permalink
refactor: reset ctx when session allocate one
Browse files Browse the repository at this point in the history
  • Loading branch information
DarthPestilane committed Jan 5, 2022
1 parent 4768e76 commit 52a570f
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 49 deletions.
20 changes: 2 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,53 +1,39 @@
ldflags=-ldflags="-s"
os=`uname`

export CGO_ENABLED=0

.PHONY: default
default: build

.PHONY: build
ldflags=-ldflags="-s"
build:
go build ${ldflags} -v

.PHONY: build-all
build-all:
go build ${ldflags} -v ./...

.PHONY: lint
lint:
golangci-lint run --concurrency=2

.PHONY: lint-fix
lint-fix:
golangci-lint run --concurrency=2 --fix

.PHONY: test
test:
CGO_ENABLED=1 go test -count=1 -race -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m . ./message

.PHONY: test-v
test-v:
CGO_ENABLED=1 go test -count=1 -race -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m -v . ./message

.PHONY: cover-view
cover-view:
go tool cover -func .testCoverage.txt
go tool cover -html .testCoverage.txt

.PHONY: spec
spec: test lint
go tool cover -func .testCoverage.txt

.PHONY: bench
bench:
go test -bench=. -run=none -benchmem -benchtime=250000x

.PHONY: tidy
tidy:
go mod tidy -v

.PHONY: gen
os=`uname`
gen:
ifeq (${os}, $(filter ${os}, Windows Windows_NT)) # If on windows, there might be something unexpected.
rm -rf ./**/gomock_reflect_*
Expand All @@ -57,10 +43,8 @@ else
go generate -v
endif

.PHONY: release-local
release-local:
goreleaser release --rm-dist --skip-announce --skip-publish --snapshot

.PHONY: clean
clean:
go clean -r -x -cache -i
9 changes: 5 additions & 4 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,18 @@ var nilHandler HandlerFunc = func(ctx Context) {}

// handleRequest walks ctx through middlewares and handler,
// and returns response message entry.
func (r *Router) handleRequest(ctx *routeContext) {
if ctx.reqEntry == nil {
func (r *Router) handleRequest(ctx Context) {
reqEntry := ctx.Request()
if reqEntry == nil {
return
}
var handler HandlerFunc
if v, has := r.handlerMapper[ctx.reqEntry.ID]; has {
if v, has := r.handlerMapper[reqEntry.ID]; has {
handler = v
}

var mws = r.globalMiddlewares
if v, has := r.middlewaresMapper[ctx.reqEntry.ID]; has {
if v, has := r.middlewaresMapper[reqEntry.ID]; has {
mws = append(mws, v...) // append to global ones
}

Expand Down
6 changes: 3 additions & 3 deletions router_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ func (c *routeContext) Copy() Context {
}
}

func (c *routeContext) reset(sess *session, reqEntry *message.Entry) {
func (c *routeContext) reset() {
c.rawCtx = context.Background()
c.session = sess
c.reqEntry = reqEntry
c.session = nil
c.reqEntry = nil
c.respEntry = nil
c.storage = nil
}
12 changes: 7 additions & 5 deletions router_context_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package easytcp

import (
"context"
"fmt"
"github.com/DarthPestilane/easytcp/internal/mock"
"github.com/DarthPestilane/easytcp/message"
Expand Down Expand Up @@ -179,17 +180,18 @@ func Test_routeContext_SendTo(t *testing.T) {
}

func Test_routeContext_reset(t *testing.T) {
ctx := newContext(nil, nil)
sess := newSession(nil, &sessionOption{})
entry := &message.Entry{
ID: 1,
Data: []byte("test"),
}
ctx.reset(sess, entry)
assert.Equal(t, ctx.session, sess)
assert.Equal(t, ctx.reqEntry, entry)
assert.Nil(t, ctx.storage)
ctx := newContext(sess, entry)
ctx.reset()
assert.Equal(t, ctx.rawCtx, context.Background())
assert.Nil(t, ctx.session)
assert.Nil(t, ctx.reqEntry)
assert.Nil(t, ctx.respEntry)
assert.Empty(t, ctx.storage)
}

func Test_routeContext_Copy(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type Session interface {
// Codec returns the codec, can be nil.
Codec() Codec

// Close closes session.
// Close closes current session.
Close()

// NewContext creates a Context.
NewContext() Context
// AllocateContext gets a Context ships with current session.
AllocateContext() Context
}

type session struct {
Expand Down Expand Up @@ -88,9 +88,12 @@ func (s *session) Close() {
s.closeOne.Do(func() { close(s.closed) })
}

// NewContext creates a Context from pool, and sets context session with s.
func (s *session) NewContext() Context {
return s.ctxPool.Get().(*routeContext).SetSession(s)
// AllocateContext gets a Context from pool and reset all but session.
func (s *session) AllocateContext() Context {
c := s.ctxPool.Get().(*routeContext)
c.reset()
c.SetSession(s)
return c
}

// readInbound reads message packet from connection in a loop.
Expand Down Expand Up @@ -120,8 +123,7 @@ func (s *session) readInbound(router *Router, timeout time.Duration) {

// don't block the loop.
go func() {
ctx := s.NewContext().(*routeContext)
ctx.reset(s, reqEntry)
ctx := s.AllocateContext().SetRequestMessage(reqEntry)
router.handleRequest(ctx)
s.Send(ctx)
}()
Expand Down
22 changes: 11 additions & 11 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ func TestTCPSession_Send(t *testing.T) {
}
sess := newSession(nil, &sessionOption{})
sess.Close() // close session
assert.False(t, sess.NewContext().SetRequestMessage(entry).Send())
assert.False(t, sess.AllocateContext().SetRequestMessage(entry).Send())
})
t.Run("when ctx is done", func(t *testing.T) {
sess := newSession(nil, &sessionOption{})
ctx, cancel := context.WithCancel(context.Background())

c := sess.NewContext().WithContext(ctx)
c := sess.AllocateContext().WithContext(ctx)
done := make(chan struct{})
go func() {
assert.False(t, c.Send())
Expand All @@ -187,7 +187,7 @@ func TestTCPSession_Send(t *testing.T) {
sess.respQueue = make(chan Context) // no buffer
go func() { <-sess.respQueue }()

assert.True(t, sess.NewContext().SetRequestMessage(entry).Send())
assert.True(t, sess.AllocateContext().SetRequestMessage(entry).Send())
sess.Close()
})
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
packer.EXPECT().Pack(gomock.Any()).AnyTimes().Return(nil, nil)

sess := newSession(nil, &sessionOption{Packer: packer, respQueueSize: 1024})
sess.respQueue <- sess.NewContext()
sess.respQueue <- sess.AllocateContext()
doneLoop := make(chan struct{})
go func() {
sess.writeOutbound(0, 10) // should stop looping and return
Expand All @@ -242,7 +242,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
sess := newSession(nil, &sessionOption{Packer: packer})
done := make(chan struct{})
go func() {
sess.respQueue <- sess.NewContext().SetResponseMessage(entry)
sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry)
close(done)
}()
time.Sleep(time.Microsecond * 15)
Expand All @@ -263,7 +263,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
packer.EXPECT().Pack(gomock.Any()).Return(nil, nil)

sess := newSession(nil, &sessionOption{Packer: packer, respQueueSize: 100})
sess.respQueue <- sess.NewContext().SetResponseMessage(entry) // push to queue
sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) // push to queue
doneLoop := make(chan struct{})
go func() {
sess.writeOutbound(0, 10)
Expand All @@ -286,7 +286,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
p1, _ := net.Pipe()
_ = p1.Close()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
go sess.writeOutbound(time.Millisecond*10, 10)
_, ok := <-sess.closed
assert.False(t, ok)
Expand All @@ -304,7 +304,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {

p1, _ := net.Pipe()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
go sess.writeOutbound(time.Millisecond*10, 10)
_, ok := <-sess.closed
assert.False(t, ok)
Expand All @@ -324,7 +324,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
p1, _ := net.Pipe()
assert.NoError(t, p1.Close())
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
sess.writeOutbound(0, 10) // should stop looping and return
_, ok := <-sess.closed
assert.False(t, ok)
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
})

sess := newSession(conn, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
sess.writeOutbound(0, 10) // should stop looping and return
_, ok := <-sess.closed
assert.False(t, ok)
Expand All @@ -374,7 +374,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {

p1, p2 := net.Pipe()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.NewContext().SetResponseMessage(entry).Send() }()
go func() { sess.AllocateContext().SetResponseMessage(entry).Send() }()
done := make(chan struct{})
go func() {
sess.writeOutbound(0, 10)
Expand Down

0 comments on commit 52a570f

Please sign in to comment.