Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(net/gudp): improve implements #3491

Merged
merged 2 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 14 additions & 121 deletions net/gudp/gudp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,85 +14,33 @@ import (
"github.com/gogf/gf/v2/errors/gerror"
)

// Conn handles the UDP connection.
type Conn struct {
*net.UDPConn // Underlying UDP connection.
remoteAddr *net.UDPAddr // Remote address.
deadlineRecv time.Time // Timeout point for reading data.
deadlineSend time.Time // Timeout point for writing data.
bufferWaitRecv time.Duration // Interval duration for reading buffer.
// localConn provides common operations for udp connection.
type localConn struct {
*net.UDPConn // Underlying UDP connection.
deadlineRecv time.Time // Timeout point for reading data.
deadlineSend time.Time // Timeout point for writing data.
}

const (
defaultRetryInterval = 100 * time.Millisecond // Retry interval.
defaultReadBufferSize = 1024 // (Byte)Buffer size.
receiveAllWaitTimeout = time.Millisecond // Default interval for reading buffer.
)

// Retry holds the retry options.
// TODO replace with standalone retry package.
type Retry struct {
Count int // Max retry count.
Interval time.Duration // Retry interval.
}

// NewConn creates UDP connection to `remoteAddress`.
// The optional parameter `localAddress` specifies the local address for connection.
func NewConn(remoteAddress string, localAddress ...string) (*Conn, error) {
if conn, err := NewNetConn(remoteAddress, localAddress...); err == nil {
return NewConnByNetConn(conn), nil
} else {
return nil, err
}
}

// NewConnByNetConn creates an UDP connection object with given *net.UDPConn object.
func NewConnByNetConn(udp *net.UDPConn) *Conn {
return &Conn{
UDPConn: udp,
deadlineRecv: time.Time{},
deadlineSend: time.Time{},
bufferWaitRecv: receiveAllWaitTimeout,
}
}

// Send writes data to remote address.
func (c *Conn) Send(data []byte, retry ...Retry) (err error) {
for {
if c.remoteAddr != nil {
_, err = c.WriteToUDP(data, c.remoteAddr)
} else {
_, err = c.Write(data)
}
if err != nil {
// Connection closed.
if err == io.EOF {
return err
}
// Still failed even after retrying.
if len(retry) == 0 || retry[0].Count == 0 {
err = gerror.Wrap(err, `Write data failed`)
return err
}
if len(retry) > 0 {
retry[0].Count--
if retry[0].Interval == 0 {
retry[0].Interval = defaultRetryInterval
}
time.Sleep(retry[0].Interval)
}
} else {
return nil
}
}
}

// Recv receives and returns data from remote address.
// The parameter `buffer` is used for customizing the receiving buffer size. If `buffer` <= 0,
// it uses the default buffer size, which is 1024 byte.
// The parameter `buffer` is used for customizing the receiving buffer size.
// If `buffer` <= 0, it uses the default buffer size, which is 1024 byte.
//
// There's package border in UDP protocol, we can receive a complete package if specified
// buffer size is big enough. VERY NOTE that we should receive the complete package in once
// or else the leftover package data would be dropped.
func (c *Conn) Recv(buffer int, retry ...Retry) ([]byte, error) {
func (c *localConn) Recv(buffer int, retry ...Retry) ([]byte, *net.UDPAddr, error) {
var (
err error // Reading error
size int // Reading size
Expand All @@ -106,9 +54,6 @@ func (c *Conn) Recv(buffer int, retry ...Retry) ([]byte, error) {
}
for {
size, remoteAddr, err = c.ReadFromUDP(data)
if err == nil {
c.remoteAddr = remoteAddr
}
if err != nil {
// Connection closed.
if err == io.EOF {
Expand All @@ -131,51 +76,11 @@ func (c *Conn) Recv(buffer int, retry ...Retry) ([]byte, error) {
}
break
}
return data[:size], err
}

// SendRecv writes data to connection and blocks reading response.
func (c *Conn) SendRecv(data []byte, receive int, retry ...Retry) ([]byte, error) {
if err := c.Send(data, retry...); err != nil {
return nil, err
}
return c.Recv(receive, retry...)
}

// RecvWithTimeout reads data from remote address with timeout.
func (c *Conn) RecvWithTimeout(length int, timeout time.Duration, retry ...Retry) (data []byte, err error) {
if err = c.SetDeadlineRecv(time.Now().Add(timeout)); err != nil {
return nil, err
}
defer func() {
_ = c.SetDeadlineRecv(time.Time{})
}()
data, err = c.Recv(length, retry...)
return
}

// SendWithTimeout writes data to connection with timeout.
func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry ...Retry) (err error) {
if err = c.SetDeadlineSend(time.Now().Add(timeout)); err != nil {
return err
}
defer func() {
_ = c.SetDeadlineSend(time.Time{})
}()
err = c.Send(data, retry...)
return
}

// SendRecvWithTimeout writes data to connection and reads response with timeout.
func (c *Conn) SendRecvWithTimeout(data []byte, receive int, timeout time.Duration, retry ...Retry) ([]byte, error) {
if err := c.Send(data, retry...); err != nil {
return nil, err
}
return c.RecvWithTimeout(receive, timeout, retry...)
return data[:size], remoteAddr, err
}

// SetDeadline sets the read and write deadlines associated with the connection.
func (c *Conn) SetDeadline(t time.Time) (err error) {
func (c *localConn) SetDeadline(t time.Time) (err error) {
if err = c.UDPConn.SetDeadline(t); err == nil {
c.deadlineRecv = t
c.deadlineSend = t
Expand All @@ -186,7 +91,7 @@ func (c *Conn) SetDeadline(t time.Time) (err error) {
}

// SetDeadlineRecv sets the read deadline associated with the connection.
func (c *Conn) SetDeadlineRecv(t time.Time) (err error) {
func (c *localConn) SetDeadlineRecv(t time.Time) (err error) {
if err = c.SetReadDeadline(t); err == nil {
c.deadlineRecv = t
} else {
Expand All @@ -196,23 +101,11 @@ func (c *Conn) SetDeadlineRecv(t time.Time) (err error) {
}

// SetDeadlineSend sets the deadline of sending for current connection.
func (c *Conn) SetDeadlineSend(t time.Time) (err error) {
func (c *localConn) SetDeadlineSend(t time.Time) (err error) {
if err = c.SetWriteDeadline(t); err == nil {
c.deadlineSend = t
} else {
err = gerror.Wrapf(err, `SetDeadlineSend for connection failed with "%s"`, t)
}
return err
}

// SetBufferWaitRecv sets the buffer waiting timeout when reading all data from connection.
// The waiting duration cannot be too long which might delay receiving data from remote address.
func (c *Conn) SetBufferWaitRecv(d time.Duration) {
c.bufferWaitRecv = d
}

// RemoteAddr returns the remote address of current UDP connection.
// Note that it cannot use c.conn.RemoteAddr() as it is nil.
func (c *Conn) RemoteAddr() net.Addr {
return c.remoteAddr
}
69 changes: 69 additions & 0 deletions net/gudp/gudp_conn_client_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package gudp

import (
"io"
"time"

"github.com/gogf/gf/v2/errors/gerror"
)

// ClientConn holds the client side connection.
type ClientConn struct {
*localConn
}

// NewClientConn creates UDP connection to `remoteAddress`.
// The optional parameter `localAddress` specifies the local address for connection.
func NewClientConn(remoteAddress string, localAddress ...string) (*ClientConn, error) {
udpConn, err := NewNetConn(remoteAddress, localAddress...)
if err != nil {
return nil, err
}
return &ClientConn{
localConn: &localConn{
UDPConn: udpConn,
},
}, nil
}

// Send writes data to remote address.
func (c *ClientConn) Send(data []byte, retry ...Retry) (err error) {
for {
_, err = c.Write(data)
if err == nil {
return nil
}
// Connection closed.
if err == io.EOF {
return err
}
// Still failed even after retrying.
gqcn marked this conversation as resolved.
Show resolved Hide resolved
if len(retry) == 0 || retry[0].Count == 0 {
return gerror.Wrap(err, `Write data failed`)
}
if len(retry) > 0 {
retry[0].Count--
if retry[0].Interval == 0 {
retry[0].Interval = defaultRetryInterval
}
time.Sleep(retry[0].Interval)
continue
}
return err
}
}

// SendRecv writes data to connection and blocks reading response.
func (c *ClientConn) SendRecv(data []byte, receive int, retry ...Retry) ([]byte, error) {
if err := c.Send(data, retry...); err != nil {
return nil, err
}
result, _, err := c.Recv(receive, retry...)
return result, err
}
56 changes: 56 additions & 0 deletions net/gudp/gudp_conn_server_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package gudp

import (
"io"
"net"
"time"

"github.com/gogf/gf/v2/errors/gerror"
)

// ServerConn holds the server side connection.
type ServerConn struct {
*localConn
}

// NewServerConn creates an udp connection that listens to `localAddress`.
func NewServerConn(listenedConn *net.UDPConn) *ServerConn {
return &ServerConn{
localConn: &localConn{
UDPConn: listenedConn,
},
}
}

// Send writes data to remote address.
func (c *ServerConn) Send(data []byte, remoteAddr *net.UDPAddr, retry ...Retry) (err error) {
for {
_, err = c.WriteToUDP(data, remoteAddr)
if err == nil {
return nil
}
// Connection closed.
if err == io.EOF {
return err
}
// Still failed even after retrying.
if len(retry) == 0 || retry[0].Count == 0 {
gqcn marked this conversation as resolved.
Show resolved Hide resolved
return gerror.Wrap(err, `Write data failed`)
}
if len(retry) > 0 {
retry[0].Count--
if retry[0].Interval == 0 {
retry[0].Interval = defaultRetryInterval
}
time.Sleep(retry[0].Interval)
continue
}
return err
}
}
10 changes: 8 additions & 2 deletions net/gudp/gudp_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewNetConn(remoteAddress string, localAddress ...string) (*net.UDPConn, err
// Send writes data to `address` using UDP connection and then closes the connection.
// Note that it is used for short connection usage.
func Send(address string, data []byte, retry ...Retry) error {
conn, err := NewConn(address)
conn, err := NewClientConn(address)
if err != nil {
return err
}
Expand All @@ -63,7 +63,7 @@ func Send(address string, data []byte, retry ...Retry) error {
// SendRecv writes data to `address` using UDP connection, reads response and then closes the connection.
// Note that it is used for short connection usage.
func SendRecv(address string, data []byte, receive int, retry ...Retry) ([]byte, error) {
conn, err := NewConn(address)
conn, err := NewClientConn(address)
if err != nil {
return nil, err
}
Expand All @@ -72,6 +72,8 @@ func SendRecv(address string, data []byte, receive int, retry ...Retry) ([]byte,
}

// MustGetFreePort performs as GetFreePort, but it panics if any error occurs.
// Deprecated: the port might be used soon after they were returned, please use `:0` as the listening
// address which asks system to assign a free port instead.
func MustGetFreePort() (port int) {
port, err := GetFreePort()
if err != nil {
Expand All @@ -81,6 +83,8 @@ func MustGetFreePort() (port int) {
}

// GetFreePort retrieves and returns a port that is free.
// Deprecated: the port might be used soon after they were returned, please use `:0` as the listening
// address which asks system to assign a free port instead.
func GetFreePort() (port int, err error) {
var (
network = `udp`
Expand Down Expand Up @@ -108,6 +112,8 @@ func GetFreePort() (port int, err error) {
}

// GetFreePorts retrieves and returns specified number of ports that are free.
// Deprecated: the ports might be used soon after they were returned, please use `:0` as the listening
// address which asks system to assign a free port instead.
func GetFreePorts(count int) (ports []int, err error) {
var (
network = `udp`
Expand Down
Loading
Loading