Skip to content

Commit

Permalink
PortStack.SendEth->PutOutboundEth refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed Jul 20, 2024
1 parent 42d2e4b commit 74e7a6b
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 61 deletions.
6 changes: 4 additions & 2 deletions stacks/arp.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (c *arpClient) pendingOutReqARPv4() bool {
return c.result.Operation == 1 // User asked for a ARP request.
}

func (c *arpClient) handle(dst []byte) (n int) {
// putOutboundEth implements [iudphandler] interface.
func (c *arpClient) putOutboundEth(dst []byte) (n int) {
pendingOutReq := c.pendingOutReqARPv4()
switch {
case pendingOutReq:
Expand Down Expand Up @@ -131,7 +132,8 @@ func (c *arpClient) handle(dst []byte) (n int) {
return n
}

func (c *arpClient) recv(ahdr *eth.ARPv4Header) error {
// recvEth implements [iudphandler] interface.
func (c *arpClient) recvEth(ahdr *eth.ARPv4Header) error {
if ahdr.HardwareLength != 6 || ahdr.ProtoLength != 4 || ahdr.HardwareType != 1 || ahdr.AssertEtherType() != eth.EtherTypeIPv4 {
return errARPUnsupported // Ignore ARP unsupported requests.
}
Expand Down
6 changes: 4 additions & 2 deletions stacks/dhcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ func getDefaultParams() []byte {
return unsafe.Slice((*byte)(unsafe.Pointer(ptr)), len(dhcpDefaultParamReqList))
}

func (d *DHCPClient) send(dst []byte) (n int, err error) {
// putOutboundEth implements [iudphandler] interface.
func (d *DHCPClient) putOutboundEth(dst []byte) (n int, err error) {
if d.isAborted() {
return 0, io.EOF
} else if !d.isPendingHandling() {
Expand Down Expand Up @@ -340,7 +341,8 @@ func (d *DHCPClient) send(dst []byte) (n int, err error) {
return ptr, nil
}

func (d *DHCPClient) recv(pkt *UDPPacket) (err error) {
// recvEth implements [iudphandler] interface.
func (d *DHCPClient) recvEth(pkt *UDPPacket) (err error) {
if d.isAborted() {
return io.EOF
}
Expand Down
6 changes: 4 additions & 2 deletions stacks/dhcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (d *DHCPServer) Start() error {
return d.stack.OpenUDP(d.port, d)
}

func (d *DHCPServer) recv(pkt *UDPPacket) (err error) {
// recvEth implements [iudphandler] interface.
func (d *DHCPServer) recvEth(pkt *UDPPacket) (err error) {
if d.isAborted() {
return io.EOF // Signal to close socket.
}
Expand All @@ -57,7 +58,8 @@ func (d *DHCPServer) recv(pkt *UDPPacket) (err error) {
return nil
}

func (d *DHCPServer) send(dst []byte) (int, error) {
// putOutboundEth implements [iudphandler] interface.
func (d *DHCPServer) putOutboundEth(dst []byte) (int, error) {
if d.isAborted() {
return 0, io.EOF // Signal to close socket.
}
Expand Down
6 changes: 4 additions & 2 deletions stacks/dns_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (dnsc *DNSClient) StartResolve(cfg DNSResolveConfig) error {
return nil
}

func (dnsc *DNSClient) send(dst []byte) (n int, err error) {
// putOutboundEth implements [iudphandler] interface.
func (dnsc *DNSClient) putOutboundEth(dst []byte) (n int, err error) {
if dnsc.state == dnsAborted {
return 0, io.EOF
} else if dnsc.state != dnsSendQuery {
Expand Down Expand Up @@ -101,7 +102,8 @@ func (dnsc *DNSClient) send(dst []byte) (n int, err error) {
return payloadOffset + int(msgLen), nil
}

func (dnsc *DNSClient) recv(pkt *UDPPacket) error {
// recvEth implements the [iudphandler] interface.
func (dnsc *DNSClient) recvEth(pkt *UDPPacket) error {

Check warning on line 106 in stacks/dns_client.go

View check run for this annotation

Codecov / codecov/patch

stacks/dns_client.go#L106

Added line #L106 was not covered by tests
if dnsc.state == dnsAborted {
return io.EOF
} else if dnsc.state != dnsAwaitResponse {
Expand Down
6 changes: 4 additions & 2 deletions stacks/ntp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func (nc *NTPClient) BeginDefaultRequest(hwaddr [6]byte, raddr netip.Addr) error
return nil
}

func (nc *NTPClient) send(dst []byte) (n int, err error) {
// putOutboundEth implements [iudphandler] interface.
func (nc *NTPClient) putOutboundEth(dst []byte) (n int, err error) {

Check warning on line 68 in stacks/ntp_client.go

View check run for this annotation

Codecov / codecov/patch

stacks/ntp_client.go#L68

Added line #L68 was not covered by tests
const (
payloadoffset = eth.SizeEthernetHeader + eth.SizeIPv4Header + eth.SizeUDPHeader
ToS = 192
Expand Down Expand Up @@ -101,7 +102,8 @@ func (nc *NTPClient) send(dst []byte) (n int, err error) {
return payloadoffset + ntp.SizeHeader, nil
}

func (nc *NTPClient) recv(pkt *UDPPacket) (err error) {
// recvEth implements the [iudphandler] interface.
func (nc *NTPClient) recvEth(pkt *UDPPacket) (err error) {

Check warning on line 106 in stacks/ntp_client.go

View check run for this annotation

Codecov / codecov/patch

stacks/ntp_client.go#L106

Added line #L106 was not covered by tests
if nc.isAborted() || nc.IsDone() {
return io.EOF
}
Expand Down
22 changes: 13 additions & 9 deletions stacks/port_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import (
)

// itcphandler represents a user provided function for handling incoming TCP packets on a port.
// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by RecvEth)
// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by [PortStack.RecvEth])
// Outgoing data is written into the `dst` byte slice (from the tx ring buffer). The function must return the number of
// bytes written to `response` and an error.
// TCPConn provides an implemntation of this interface - note .send is ONLY called by HandleEth
// TCPConn provides an implemntation of this interface - note .send is ONLY called by [PortStack.PutOutboundEth]
// See [PortStack] for information on how to use this function and other port handlers.
// note TCPConn is our implementation of this interface
// Note [TCPConn] is our implementation of this interface
type itcphandler interface {
send(dst []byte) (n int, err error)
recv(pkt *TCPPacket) error
// putOutboundEth is called by the underlying stack [PortStack.PutOutboundEth] method and populates
// response from the TX ring buffer, with data to be sent as a packet and returns n bytes written.
// See [PortStack] for more information.
putOutboundEth(response []byte) (n int, err error)
// recvEth called by the [PortStack.RecvEth] method when a packet is received on the network interface, pkt is (a pointer to) the arrived packet.
recvEth(pkt *TCPPacket) error
// needsHandling() bool
isPendingHandling() bool
abort()
Expand All @@ -37,14 +41,14 @@ func (port *tcpPort) IsPendingHandling() bool {
return port.port != 0 && port.handler.isPendingHandling()
}

// HandleEth writes the socket's response into dst to be sent over an ethernet interface.
// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken.
func (port *tcpPort) HandleEth(dst []byte) (n int, err error) {
// PutOutboundEth writes the socket's response into dst to be sent over an ethernet interface.
// PutOutboundEth can return 0 bytes written and a nil error to indicate no action must be taken.
func (port *tcpPort) PutOutboundEth(dst []byte) (n int, err error) {
if port.handler == nil {
panic("nil tcp handler on port " + strconv.Itoa(int(port.port)))
}

n, err = port.handler.send(dst)
n, err = port.handler.putOutboundEth(dst)
port.p = false
if err == ErrFlagPending {
port.p = true
Expand Down
15 changes: 9 additions & 6 deletions stacks/port_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
)

type iudphandler interface {
send(dst []byte) (n int, err error)
recv(pkt *UDPPacket) error
// putOutboundEth is called by the underlying stack [PortStack.PutOutboundEth] method and populates
// response from the TX ring buffer, with data to be sent as a packet and returns n bytes written.
// See [PortStack] for more information.
putOutboundEth(response []byte) (n int, err error)
recvEth(pkt *UDPPacket) error
// needsHandling() bool
isPendingHandling() bool
abort()
Expand All @@ -28,15 +31,15 @@ func (port *udpPort) IsPendingHandling() bool {
return port.port != 0 && port.handler.isPendingHandling()
}

// HandleEth writes the socket's response into dst to be sent over an ethernet interface.
// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken.
func (port *udpPort) HandleEth(dst []byte) (int, error) {
// PutOutboundEth writes the socket's response into dst to be sent over an ethernet interface.
// PutOutboundEth can return 0 bytes written and a nil error to indicate no action must be taken.
func (port *udpPort) PutOutboundEth(dst []byte) (int, error) {

if port.handler == nil {
panic("nil udp handler on port " + strconv.Itoa(int(port.port)))
}

return port.handler.send(dst)
return port.handler.putOutboundEth(dst)
}

// Open sets the UDP handler and opens the port.
Expand Down
40 changes: 20 additions & 20 deletions stacks/portstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
type socket interface {
Close()
IsPendingHandling() bool
HandleEth(dst []byte) (int, error)
PutOutboundEth(dst []byte) (int, error)
}

var modernAge = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
Expand All @@ -32,7 +32,7 @@ type PortStackConfig struct {
MaxOpenPortsUDP int
MaxOpenPortsTCP int
// GlobalHandler processes all incoming ethernet frames before they reach the port handlers.
// If GlobalHandler returns an error the frame is discarded and PortStack.HandleEth returns the error.
// If GlobalHandler returns an error the frame is discarded and PortStack.PutOutboundEth returns the error.
// GlobalHandler ethernethandler
Logger *slog.Logger
MAC [6]byte
Expand Down Expand Up @@ -70,7 +70,7 @@ var ErrFlagPending = errors.New("seqs: pending data")
//
// # Notes on PortStack handlers
//
// - While PortStack.HandleEth has yet to find a outgoing packet it will look for
// - While PortStack.PutOutboundEth has yet to find a outgoing packet it will look for
// a port that has a pending packet or has been flagged as pending and call its handler.
//
// - A call to a handler may or may not have an incoming packet ready to process.
Expand All @@ -88,7 +88,7 @@ var ErrFlagPending = errors.New("seqs: pending data")
//
// - ErrFlagPending: When returned by the handler then the port is flagged as
// pending and the written data is handled normally if there is any. If no data is written
// the call to HandleEth proceeds looking for another port to handle.
// the call to PutOutboundEth proceeds looking for another port to handle.
//
// - ErrFlagPending: When returned by the handler then for UDP/TCP implementations the
// incoming packet argument `pkt` is flagged as not present in future calls to the handler in pkt.HasPacket calls.
Expand Down Expand Up @@ -162,9 +162,9 @@ func (ps *PortStack) MTU() uint16 { return ps.mtu }
func (ps *PortStack) HardwareAddr6() [6]byte { return ps.mac }

// RecvEth validates an ethernet+ipv4 frame in payload. If it is OK then it
// defers response handling of the packets during a call to [PortStack.HandleEth].
// defers response handling of the packets during a call to [PortStack.PutOutboundEth].
//
// If [PortStack.HandleEth] is not called often enough prevent packet queue from
// If [PortStack.PutOutboundEth] is not called often enough prevent packet queue from
// filling up on a socket RecvEth will start to return [ErrDroppedPacket].
func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
// defer ps.trace("RecvEth:end")
Expand Down Expand Up @@ -198,7 +198,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
return errPacketSmol
}
ps.auxARP = eth.DecodeARPv4Header(payload[eth.SizeEthernetHeader:])
return ps.arpClient.recv(&ps.auxARP)
return ps.arpClient.recvEth(&ps.auxARP)
}
// IP parsing block.
var ipOffset uint8
Expand Down Expand Up @@ -272,8 +272,8 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
pkt.Eth = *ehdr
pkt.IP = ihdr // TODO(soypat): Don't ignore IP options.
pkt.UDP = uhdr
copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet
err = port.handler.recv(pkt) //<-- where the magic happens - invoking recv(), passes the arrived packet so in can be placed in the RX ring buffer
copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet
err = port.handler.recvEth(pkt) //<-- where the magic happens - invoking recvEth(), passes the arrived packet so in can be placed in the RX ring buffer
if err == io.EOF {
// Special case; EOF is flag to close port
err = nil
Expand Down Expand Up @@ -341,7 +341,7 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
n := copy(pkt.data[:], ipOptions)
n += copy(pkt.data[n:], tcpOptions)
copy(pkt.data[n:], payload)
err = port.handler.recv(pkt)
err = port.handler.recvEth(pkt)
if err == io.EOF {
// Special case; EOF is flag to close port
err = nil
Expand All @@ -359,36 +359,36 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
return err
}

// HandleEth searches for a socket with a pending packet and writes the response
// PutOutboundEth searches for a socket with a pending packet and writes the response
// into the dst argument. The length written to dst is returned.
// [ErrFlagPending] can be returned by value by a handler to indicate the packet was
// not processed and that a future call to HandleEth is required to complete.
// not processed and that a future call to PutOutboundEth is required to complete.
//
// If a handler returns any other error the port is closed.
func (ps *PortStack) HandleEth(dst []byte) (n int, err error) {
func (ps *PortStack) PutOutboundEth(dst []byte) (n int, err error) {
isTrace := ps.isLogEnabled(internal.LevelTrace)
n, err = ps.handleEth(dst)
n, err = ps.putOutboundEth(dst)
if n > 0 && err == nil {
if isTrace {
ps.trace("Stack:HandleEth", slog.Int("plen", n))
ps.trace("Stack:PutOutboundEth", slog.Int("plen", n))

Check warning on line 373 in stacks/portstack.go

View check run for this annotation

Codecov / codecov/patch

stacks/portstack.go#L373

Added line #L373 was not covered by tests
}
ps.lastTx = ps.now()
ps.processedPackets++
} else if err != nil && ps.isLogEnabled(slog.LevelError) {
ps.error("Stack:HandleEth", slog.String("err", err.Error()))
ps.error("Stack:PutOutboundEth", slog.String("err", err.Error()))

Check warning on line 378 in stacks/portstack.go

View check run for this annotation

Codecov / codecov/patch

stacks/portstack.go#L378

Added line #L378 was not covered by tests
}
return n, err
}

func (ps *PortStack) handleEth(dst []byte) (n int, err error) {
func (ps *PortStack) putOutboundEth(dst []byte) (n int, err error) {
switch {
case len(dst) < int(ps.mtu):
return 0, io.ErrShortBuffer

case !ps.IsPendingHandling():
return 0, nil // No remaining packets to handle.
}
n = ps.arpClient.handle(dst)
n = ps.arpClient.putOutboundEth(dst)
if n != 0 {
return n, nil
}
Expand Down Expand Up @@ -447,7 +447,7 @@ func handleSocket(dst []byte, sock socket) (int, bool, error) {
return 0, false, nil // Nothing to handle, just skip.
}
// Socket has an unhandled packet.
n, err := sock.HandleEth(dst)
n, err := sock.PutOutboundEth(dst)
if err == ErrFlagPending {
// Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error.
return n, true, nil
Expand All @@ -464,7 +464,7 @@ func handleSocket(dst []byte, sock socket) (int, bool, error) {
return n, sock.IsPendingHandling(), err
}

// IsPendingHandling checks if a call to HandleEth could possibly result in a packet being generated by the PortStack.
// IsPendingHandling checks if a call to PutOutboundEth could possibly result in a packet being generated by the PortStack.
func (ps *PortStack) IsPendingHandling() bool {
return ps.pendingUDPv4 > 0 || ps.pendingTCPv4 > 0 || ps.arpClient.isPending()
}
Expand Down
4 changes: 2 additions & 2 deletions stacks/stacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ func (egr *Exchanger) HandleTx(t *testing.T) (pkts, bytesSent int) {
var err error
for istack := 0; istack < len(egr.Stacks); istack++ {
// This first for loop generates packets "in-flight" contained in `pipes` data structure.
egr.pipesN[istack], err = egr.Stacks[istack].HandleEth(egr.pipes[istack][:])
egr.pipesN[istack], err = egr.Stacks[istack].PutOutboundEth(egr.pipes[istack][:])
egr.handleErr(t, err, "send", istack)
bytesSent += egr.pipesN[istack]
if egr.pipesN[istack] > 0 {
Expand Down Expand Up @@ -1025,7 +1025,7 @@ func checkNoMoreDataSent(t *testing.T, msg string, egr *Exchanger) {
handleTx := func() (newTxs int) {
txOld := txs
for istack := 0; istack < len(egr.Stacks); istack++ {
n, _ := egr.Stacks[istack].HandleEth(buf)
n, _ := egr.Stacks[istack].PutOutboundEth(buf)
if n > 0 {
txs++
data += n
Expand Down
8 changes: 4 additions & 4 deletions stacks/tcpconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func (sock *TCPConn) checkPipeOpen() error {
return nil
}

// recv is called by the PortStack.RecvEth when a packet is received on the network interface, pkt is (a pointer to) the arrived packet.
func (sock *TCPConn) recv(pkt *TCPPacket) (err error) {
// recvEth implements the [itcphandler] interface.
func (sock *TCPConn) recvEth(pkt *TCPPacket) (err error) {
sock.trace("TCPConn.recv:start")
prevState := sock.scb.State()
if prevState.IsClosed() {
Expand Down Expand Up @@ -382,8 +382,8 @@ func (sock *TCPConn) recv(pkt *TCPPacket) (err error) {
return err
}

// Send this handler is called by the underlying stack and populates response[] from the TX ring buffer, with data to be sent as a packet
func (sock *TCPConn) send(response []byte) (n int, err error) {
// Send this handler is called by the underlying stack [PortStack.PutOutboundEth] method and populates response[] from the TX ring buffer, with data to be sent as a packet
func (sock *TCPConn) putOutboundEth(response []byte) (n int, err error) {
defer sock.trace("TCPConn.send:start")
if !sock.remote.IsValid() {
return 0, nil // No remote address yet, yield.
Expand Down
Loading

0 comments on commit 74e7a6b

Please sign in to comment.