Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reset ctx when session allocate one #31

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,53 +1,39 @@
ldflags=-ldflags="-s"
os=`uname`

export CGO_ENABLED=0

.PHONY: default
default: build

.PHONY: build
ldflags=-ldflags="-s"
build:
go build ${ldflags} -v

.PHONY: build-all
build-all:
go build ${ldflags} -v ./...

.PHONY: lint
lint:
golangci-lint run --concurrency=2

.PHONY: lint-fix
lint-fix:
golangci-lint run --concurrency=2 --fix

.PHONY: test
test:
CGO_ENABLED=1 go test -count=1 -race -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m . ./message

.PHONY: test-v
test-v:
CGO_ENABLED=1 go test -count=1 -race -covermode=atomic -coverprofile=.testCoverage.txt -timeout=2m -v . ./message

.PHONY: cover-view
cover-view:
go tool cover -func .testCoverage.txt
go tool cover -html .testCoverage.txt

.PHONY: spec
spec: test lint
go tool cover -func .testCoverage.txt

.PHONY: bench
bench:
go test -bench=. -run=none -benchmem -benchtime=250000x

.PHONY: tidy
tidy:
go mod tidy -v

.PHONY: gen
os=`uname`
gen:
ifeq (${os}, $(filter ${os}, Windows Windows_NT)) # If on windows, there might be something unexpected.
rm -rf ./**/gomock_reflect_*
Expand All @@ -57,10 +43,8 @@ else
go generate -v
endif

.PHONY: release-local
release-local:
goreleaser release --rm-dist --skip-announce --skip-publish --snapshot

.PHONY: clean
clean:
go clean -r -x -cache -i
9 changes: 5 additions & 4 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,18 @@ var nilHandler HandlerFunc = func(ctx Context) {}

// handleRequest walks ctx through middlewares and handler,
// and returns response message entry.
func (r *Router) handleRequest(ctx *routeContext) {
if ctx.reqEntry == nil {
func (r *Router) handleRequest(ctx Context) {
reqEntry := ctx.Request()
if reqEntry == nil {
return
}
var handler HandlerFunc
if v, has := r.handlerMapper[ctx.reqEntry.ID]; has {
if v, has := r.handlerMapper[reqEntry.ID]; has {
handler = v
}

var mws = r.globalMiddlewares
if v, has := r.middlewaresMapper[ctx.reqEntry.ID]; has {
if v, has := r.middlewaresMapper[reqEntry.ID]; has {
mws = append(mws, v...) // append to global ones
}

Expand Down
6 changes: 3 additions & 3 deletions router_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ func (c *routeContext) Copy() Context {
}
}

func (c *routeContext) reset(sess *session, reqEntry *message.Entry) {
func (c *routeContext) reset() {
c.rawCtx = context.Background()
c.session = sess
c.reqEntry = reqEntry
c.session = nil
c.reqEntry = nil
c.respEntry = nil
c.storage = nil
}
12 changes: 7 additions & 5 deletions router_context_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package easytcp

import (
"context"
"fmt"
"github.com/DarthPestilane/easytcp/internal/mock"
"github.com/DarthPestilane/easytcp/message"
Expand Down Expand Up @@ -179,17 +180,18 @@ func Test_routeContext_SendTo(t *testing.T) {
}

func Test_routeContext_reset(t *testing.T) {
ctx := newContext(nil, nil)
sess := newSession(nil, &sessionOption{})
entry := &message.Entry{
ID: 1,
Data: []byte("test"),
}
ctx.reset(sess, entry)
assert.Equal(t, ctx.session, sess)
assert.Equal(t, ctx.reqEntry, entry)
assert.Nil(t, ctx.storage)
ctx := newContext(sess, entry)
ctx.reset()
assert.Equal(t, ctx.rawCtx, context.Background())
assert.Nil(t, ctx.session)
assert.Nil(t, ctx.reqEntry)
assert.Nil(t, ctx.respEntry)
assert.Empty(t, ctx.storage)
}

func Test_routeContext_Copy(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type Session interface {
// Codec returns the codec, can be nil.
Codec() Codec

// Close closes session.
// Close closes current session.
Close()

// NewContext creates a Context.
NewContext() Context
// AllocateContext gets a Context ships with current session.
AllocateContext() Context
}

type session struct {
Expand Down Expand Up @@ -88,9 +88,12 @@ func (s *session) Close() {
s.closeOne.Do(func() { close(s.closed) })
}

// NewContext creates a Context from pool, and sets context session with s.
func (s *session) NewContext() Context {
return s.ctxPool.Get().(*routeContext).SetSession(s)
// AllocateContext gets a Context from pool and reset all but session.
func (s *session) AllocateContext() Context {
c := s.ctxPool.Get().(*routeContext)
c.reset()
c.SetSession(s)
return c
}

// readInbound reads message packet from connection in a loop.
Expand Down Expand Up @@ -120,8 +123,7 @@ func (s *session) readInbound(router *Router, timeout time.Duration) {

// don't block the loop.
go func() {
ctx := s.NewContext().(*routeContext)
ctx.reset(s, reqEntry)
ctx := s.AllocateContext().SetRequestMessage(reqEntry)
router.handleRequest(ctx)
s.Send(ctx)
}()
Expand Down
22 changes: 11 additions & 11 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ func TestTCPSession_Send(t *testing.T) {
}
sess := newSession(nil, &sessionOption{})
sess.Close() // close session
assert.False(t, sess.NewContext().SetRequestMessage(entry).Send())
assert.False(t, sess.AllocateContext().SetRequestMessage(entry).Send())
})
t.Run("when ctx is done", func(t *testing.T) {
sess := newSession(nil, &sessionOption{})
ctx, cancel := context.WithCancel(context.Background())

c := sess.NewContext().WithContext(ctx)
c := sess.AllocateContext().WithContext(ctx)
done := make(chan struct{})
go func() {
assert.False(t, c.Send())
Expand All @@ -187,7 +187,7 @@ func TestTCPSession_Send(t *testing.T) {
sess.respQueue = make(chan Context) // no buffer
go func() { <-sess.respQueue }()

assert.True(t, sess.NewContext().SetRequestMessage(entry).Send())
assert.True(t, sess.AllocateContext().SetRequestMessage(entry).Send())
sess.Close()
})
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
packer.EXPECT().Pack(gomock.Any()).AnyTimes().Return(nil, nil)

sess := newSession(nil, &sessionOption{Packer: packer, respQueueSize: 1024})
sess.respQueue <- sess.NewContext()
sess.respQueue <- sess.AllocateContext()
doneLoop := make(chan struct{})
go func() {
sess.writeOutbound(0, 10) // should stop looping and return
Expand All @@ -242,7 +242,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
sess := newSession(nil, &sessionOption{Packer: packer})
done := make(chan struct{})
go func() {
sess.respQueue <- sess.NewContext().SetResponseMessage(entry)
sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry)
close(done)
}()
time.Sleep(time.Microsecond * 15)
Expand All @@ -263,7 +263,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
packer.EXPECT().Pack(gomock.Any()).Return(nil, nil)

sess := newSession(nil, &sessionOption{Packer: packer, respQueueSize: 100})
sess.respQueue <- sess.NewContext().SetResponseMessage(entry) // push to queue
sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) // push to queue
doneLoop := make(chan struct{})
go func() {
sess.writeOutbound(0, 10)
Expand All @@ -286,7 +286,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
p1, _ := net.Pipe()
_ = p1.Close()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
go sess.writeOutbound(time.Millisecond*10, 10)
_, ok := <-sess.closed
assert.False(t, ok)
Expand All @@ -304,7 +304,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {

p1, _ := net.Pipe()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
go sess.writeOutbound(time.Millisecond*10, 10)
_, ok := <-sess.closed
assert.False(t, ok)
Expand All @@ -324,7 +324,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
p1, _ := net.Pipe()
assert.NoError(t, p1.Close())
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
sess.writeOutbound(0, 10) // should stop looping and return
_, ok := <-sess.closed
assert.False(t, ok)
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {
})

sess := newSession(conn, &sessionOption{Packer: packer})
go func() { sess.respQueue <- sess.NewContext().SetResponseMessage(entry) }()
go func() { sess.respQueue <- sess.AllocateContext().SetResponseMessage(entry) }()
sess.writeOutbound(0, 10) // should stop looping and return
_, ok := <-sess.closed
assert.False(t, ok)
Expand All @@ -374,7 +374,7 @@ func TestTCPSession_writeOutbound(t *testing.T) {

p1, p2 := net.Pipe()
sess := newSession(p1, &sessionOption{Packer: packer})
go func() { sess.NewContext().SetResponseMessage(entry).Send() }()
go func() { sess.AllocateContext().SetResponseMessage(entry).Send() }()
done := make(chan struct{})
go func() {
sess.writeOutbound(0, 10)
Expand Down