Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ jobs:
compatibility-test:
strategy:
matrix:
go: [ 1.18, 1.23 ]
os: [ X64, ARM64 ]
go: [ 1.18, 1.24 ]
os: [ ubuntu-latest, ubuntu-24.04-arm, macos-latest ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
cache: false
- name: Unit Test
run: go test -timeout=2m -race ./...
- name: Benchmark
Expand Down Expand Up @@ -46,7 +45,7 @@ jobs:
uses: crate-ci/typos@v1.13.14

golangci-lint:
runs-on: [ Linux, X64 ]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
Expand Down
65 changes: 56 additions & 9 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ type connection struct {
locker
operator *FDOperator
readTimeout time.Duration
readDeadline int64 // UnixNano(). it overwrites readTimeout. 0 if not set.
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeDeadline int64 // UnixNano(). it overwrites writeTimeout. 0 if not set.
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
Expand Down Expand Up @@ -87,6 +89,7 @@ func (c *connection) SetReadTimeout(timeout time.Duration) error {
if timeout >= 0 {
c.readTimeout = timeout
}
c.readDeadline = 0
return nil
}

Expand All @@ -95,6 +98,38 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
if timeout >= 0 {
c.writeTimeout = timeout
}
c.writeDeadline = 0
return nil
}

// SetDeadline implements net.Conn.SetDeadline
func (c *connection) SetDeadline(t time.Time) error {
v := int64(0)
if !t.IsZero() {
v = t.UnixNano()
}
c.readDeadline = v
c.writeDeadline = v
return nil
}

// SetReadDeadline implements net.Conn.SetReadDeadline
func (c *connection) SetReadDeadline(t time.Time) error {
if t.IsZero() {
c.readDeadline = 0
} else {
c.readDeadline = t.UnixNano()
}
return nil
}

// SetWriteDeadline implements net.Conn.SetWriteDeadline
func (c *connection) SetWriteDeadline(t time.Time) error {
if t.IsZero() {
c.writeDeadline = 0
} else {
c.writeDeadline = t.UnixNano()
}
return nil
}

Expand Down Expand Up @@ -408,8 +443,14 @@ func (c *connection) waitRead(n int) (err error) {
}
atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
if dl := c.readDeadline; dl > 0 {
timeout := time.Duration(dl - time.Now().UnixNano())
if timeout <= 0 {
return Exception(ErrReadTimeout, c.remoteAddr.String())
}
return c.waitReadWithTimeout(n, timeout)
} else if c.readTimeout > 0 {
return c.waitReadWithTimeout(n, c.readTimeout)
}
// wait full n
for c.inputBuffer.Len() < n {
Expand All @@ -429,12 +470,11 @@ func (c *connection) waitRead(n int) (err error) {
}

// waitReadWithTimeout will wait full n bytes or until timeout.
func (c *connection) waitReadWithTimeout(n int) (err error) {
// set read timeout
func (c *connection) waitReadWithTimeout(n int, timeout time.Duration) (err error) {
if c.readTimer == nil {
c.readTimer = time.NewTimer(c.readTimeout)
c.readTimer = time.NewTimer(timeout)
} else {
c.readTimer.Reset(c.readTimeout)
c.readTimer.Reset(timeout)
}

for c.inputBuffer.Len() < n {
Expand Down Expand Up @@ -501,15 +541,22 @@ func (c *connection) flush() error {
}

func (c *connection) waitFlush() (err error) {
if c.writeTimeout == 0 {
timeout := c.writeTimeout
if dl := c.writeDeadline; dl > 0 {
timeout = time.Duration(dl - time.Now().UnixNano())
if timeout <= 0 {
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}
if timeout == 0 {
return <-c.writeTrigger
}

// set write timeout
if c.writeTimer == nil {
c.writeTimer = time.NewTimer(c.writeTimeout)
c.writeTimer = time.NewTimer(timeout)
} else {
c.writeTimer.Reset(c.writeTimeout)
c.writeTimer.Reset(timeout)
}

select {
Expand Down
Loading
Loading