Skip to content
This repository was archived by the owner on Feb 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ require (
github.com/google/uuid v1.3.0
github.com/klauspost/compress v1.16.6
github.com/planetscale/vtprotobuf v0.4.0
github.com/stealthrocket/net v0.1.11
github.com/stealthrocket/wasi-go v0.6.19
github.com/stealthrocket/net v0.2.1
github.com/stealthrocket/wasi-go v0.6.20
github.com/stealthrocket/wazergo v0.19.1
github.com/stealthrocket/wzprof v0.1.5
github.com/tetratelabs/wazero v1.2.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQs
github.com/planetscale/vtprotobuf v0.4.0 h1:NEI+g4woRaAZgeZ3sAvbtyvMBRjIv5kE7EWYQ8m4JwY=
github.com/planetscale/vtprotobuf v0.4.0/go.mod h1:wm1N3qk9G/4+VM1WhpkLbvY/d8+0PbwYYpP5P5VhTks=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stealthrocket/net v0.1.11 h1:JPItm/qbkwtXwsc86k7Vb73yReMp+5yjAg84VG/BqoQ=
github.com/stealthrocket/net v0.1.11/go.mod h1:VvoFod9pYC9mo+bEg2NQB/D+KVOjxfhZjZ5zyvozq7M=
github.com/stealthrocket/wasi-go v0.6.19 h1:LrDbWyINP5faLicMTqglD2SniHtCmdajHpLgk4KeLDw=
github.com/stealthrocket/wasi-go v0.6.19/go.mod h1:PJ5oVs2E1ciOJnsTnav4nvTtEcJ4D1jUZAewS9pzuZg=
github.com/stealthrocket/net v0.2.1 h1:PehPGAAjuV46zaeHGlNgakFV7QDGUAREMcEQsZQ8NLo=
github.com/stealthrocket/net v0.2.1/go.mod h1:VvoFod9pYC9mo+bEg2NQB/D+KVOjxfhZjZ5zyvozq7M=
github.com/stealthrocket/wasi-go v0.6.20 h1:15j4vhbB3kmuTs14e91cGSivpmh+s6PHZkLQSx3Namc=
github.com/stealthrocket/wasi-go v0.6.20/go.mod h1:PJ5oVs2E1ciOJnsTnav4nvTtEcJ4D1jUZAewS9pzuZg=
github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c=
github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs=
github.com/stealthrocket/wzprof v0.1.5 h1:abEwQF9KtqV7UQ0hWk7431vul9/FxOg1eRCqwEKo9/4=
Expand Down
88 changes: 70 additions & 18 deletions internal/sandbox/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,10 @@ func closeOnCancel(ctx context.Context, conn io.Closer) {
conn.Close() //nolint:errcheck
}

type packetConnTunnel[T sockaddr] struct {
type packetTunnel[T sockaddr] struct {
refc int32
sock *socket[T]
conn net.PacketConn
conn io.Closer
errs chan<- wasi.Errno
}

Expand All @@ -597,71 +597,123 @@ func (s *socket[T]) startPacketTunnel(ctx context.Context, conn net.PacketConn)
wbufsize := s.wbuf.size()

buffer := make([]byte, rbufsize+wbufsize)
tunnel := &packetConnTunnel[T]{
tunnel := &packetTunnel[T]{
refc: 2,
sock: s,
conn: conn,
errs: errs,
}

go tunnel.readFromPacketConn(buffer[:rbufsize])
go tunnel.writeToPacketConn(buffer[rbufsize:])
go tunnel.readFromPacketConn(conn, buffer[:rbufsize])
go tunnel.writeToPacketConn(conn, buffer[rbufsize:])
go closeOnCancel(ctx, conn)
}

func (p *packetConnTunnel[T]) unref() {
func (s *socket[T]) startPacketTunnelTo(ctx context.Context, conn net.Conn) {
ctx, s.cancel = context.WithCancel(ctx)

errs := make(chan wasi.Errno, 2)
s.errs = errs
s.allocateBuffersIfNil()

rbufsize := s.rbuf.size()
wbufsize := s.wbuf.size()

buffer := make([]byte, rbufsize+wbufsize)
tunnel := &packetTunnel[T]{
refc: 2,
sock: s,
conn: conn,
errs: errs,
}

go tunnel.readFromConn(conn, buffer[:rbufsize])
go tunnel.writeToConn(conn, buffer[rbufsize:])
go closeOnCancel(ctx, conn)
}

func (p *packetTunnel[T]) unref() {
if atomic.AddInt32(&p.refc, -1) == 0 {
p.conn.Close()
close(p.errs)
}
}

func (p *packetConnTunnel[T]) readFromPacketConn(buf []byte) {
func (p *packetTunnel[T]) readFromPacketConn(conn net.PacketConn, buf []byte) {
network := p.sock.net
p.readFrom(buf, func(b []byte) (int, T, error) {
var zero T
n, addr, err := conn.ReadFrom(b)
if err != nil {
return n, zero, err
}
peer, errno := network.sockAddr(addr)
if errno != wasi.ESUCCESS {
return n, zero, errno
}
return n, peer, nil
})
}

func (p *packetTunnel[T]) readFromConn(conn net.Conn, buf []byte) {
addr := p.sock.raddr
p.readFrom(buf, func(b []byte) (int, T, error) {
n, err := conn.Read(b)
return n, addr, err
})
}

func (p *packetTunnel[T]) readFrom(buf []byte, read func([]byte) (int, T, error)) {
defer p.unref()
defer p.sock.rbuf.close()

for {
size, addr, err := p.conn.ReadFrom(buf)
size, addr, err := read(buf)
if err != nil {
p.errs <- wasi.MakeErrno(err)
return
}
// TODO:
// - capture metric about packets that were dropped
// - log details about the reason why a packet was dropped
peer, errno := p.sock.net.sockAddr(addr)
if errno != wasi.ESUCCESS {
continue
}
_, errno = p.sock.rbuf.sendmsg([]wasi.IOVec{buf[:size]}, peer)
_, errno := p.sock.rbuf.sendmsg([]wasi.IOVec{buf[:size]}, addr)
if errno != wasi.ESUCCESS {
continue
}
}
}

func (p *packetConnTunnel[T]) writeToPacketConn(buf []byte) {
func (p *packetTunnel[T]) writeToPacketConn(conn net.PacketConn, buf []byte) {
proto := p.sock.proto
p.writeTo(buf, func(b []byte, a T) (int, error) { return conn.WriteTo(b, a.netAddr(proto)) })
}

func (p *packetTunnel[T]) writeToConn(conn net.Conn, buf []byte) {
p.writeTo(buf, func(b []byte, _ T) (int, error) { return conn.Write(b) })
}

func (p *packetTunnel[T]) writeTo(buf []byte, write func([]byte, T) (int, error)) {
defer p.unref()
defer p.sock.wbuf.close()

sig := make(chan struct{}, 1)
signal := make(chan struct{}, 1)
for {
size, _, addr, errno := p.sock.wbuf.recvmsg([]wasi.IOVec{buf}, 0)
switch errno {
case wasi.ESUCCESS:
if size == 0 {
return
}
_, err := p.conn.WriteTo(buf[:size], addr.netAddr(p.sock.proto))
_, err := write(buf[:size], addr)
if err != nil {
p.errs <- wasi.MakeErrno(err)
return
}
case wasi.EAGAIN:
var ready bool
p.sock.wev.synchronize(func() { ready = p.sock.wev.poll(sig) })
p.sock.wev.synchronize(func() { ready = p.sock.wev.poll(signal) })
if !ready {
<-sig
<-signal
}
default:
// TODO:
Expand Down
21 changes: 10 additions & 11 deletions internal/sandbox/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"crypto/tls"
"fmt"
"net"
"sync"
"time"

Expand Down Expand Up @@ -704,10 +703,7 @@ func (s *socket[T]) SockConnect(ctx context.Context, addr wasi.SocketAddress) wa
if errno != wasi.ESUCCESS {
return errno
}
// The panic if the type conversion fails is the desired behavior
// here, the dial function should return a type which implements
// net.PacketConn when dialing for a datagram protocol such as udp.
s.startPacketTunnel(ctx, c.(net.PacketConn))
s.startPacketTunnelTo(ctx, c)
}
s.wev.trigger()
return wasi.ESUCCESS
Expand Down Expand Up @@ -852,6 +848,9 @@ func (s *socket[T]) sockRecvFrom(ctx context.Context, iovs []wasi.IOVec, flags w
}

func (s *socket[T]) SockSend(ctx context.Context, iovs []wasi.IOVec, flags wasi.SIFlags) (wasi.Size, wasi.Errno) {
if s.flags.has(sockClosed) {
return ^wasi.Size(0), wasi.ECONNRESET
}
if !s.flags.has(sockConn) {
return ^wasi.Size(0), wasi.ENOTCONN
}
Expand All @@ -863,25 +862,25 @@ func (s *socket[T]) SockSendTo(ctx context.Context, iovs []wasi.IOVec, flags was
if errno != wasi.ESUCCESS {
return ^wasi.Size(0), errno
}
if s.flags.has(sockClosed) {
return ^wasi.Size(0), wasi.ECONNRESET
}
if s.flags.has(sockConn) {
return 0, wasi.EISCONN
}
return s.sockSendTo(ctx, iovs, flags, dstAddr)
}

func (s *socket[T]) sockSendTo(ctx context.Context, iovs []wasi.IOVec, flags wasi.SIFlags, addr T) (wasi.Size, wasi.Errno) {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.flags.has(sockClosed) {
return ^wasi.Size(0), wasi.ECONNRESET
}
if s.flags.has(sockListen) {
return ^wasi.Size(0), wasi.ENOTSUP
}
if s.typ == stream && !s.flags.has(sockConn) {
return ^wasi.Size(0), wasi.ENOTCONN
}
if s.flags.has(sockConn) && addr != s.raddr {
return ^wasi.Size(0), wasi.EISCONN
}
if errno := s.getErrno(); errno != wasi.ESUCCESS {
return ^wasi.Size(0), errno
}
Expand Down