-
Notifications
You must be signed in to change notification settings - Fork 669
Revamp GRPC Port forwarding tunnels to use existing proxy #2985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,20 +2,18 @@ package portfwd | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"net" | ||
"time" | ||
|
||
"github.com/containers/gvisor-tap-vsock/pkg/services/forwarder" | ||
"github.com/lima-vm/lima/pkg/bicopy" | ||
"github.com/lima-vm/lima/pkg/guestagent/api" | ||
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client" | ||
"github.com/sirupsen/logrus" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.Conn, guestAddr string) { | ||
defer conn.Close() | ||
|
||
id := fmt.Sprintf("tcp-%s-%s", conn.LocalAddr().String(), conn.RemoteAddr().String()) | ||
|
||
stream, err := client.Tunnel(ctx) | ||
|
@@ -24,26 +22,17 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen | |
return | ||
} | ||
|
||
g, _ := errgroup.WithContext(ctx) | ||
|
||
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr} | ||
g.Go(func() error { | ||
_, err := io.Copy(rw, conn) | ||
return err | ||
}) | ||
g.Go(func() error { | ||
_, err := io.Copy(conn, rw) | ||
return err | ||
}) | ||
|
||
if err := g.Wait(); err != nil { | ||
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err) | ||
// Handshake message to start tunnel | ||
if err := stream.Send(&api.TunnelMessage{Id: id, Protocol: "tcp", GuestAddr: guestAddr}); err != nil { | ||
logrus.Errorf("could not start tcp tunnel for id: %s error:%v", id, err) | ||
return | ||
} | ||
|
||
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "tcp"} | ||
bicopy.Bicopy(rw, conn, nil) | ||
} | ||
|
||
func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) { | ||
defer conn.Close() | ||
|
||
id := fmt.Sprintf("udp-%s", conn.LocalAddr().String()) | ||
|
||
stream, err := client.Tunnel(ctx) | ||
|
@@ -52,98 +41,82 @@ func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgen | |
return | ||
} | ||
|
||
g, _ := errgroup.WithContext(ctx) | ||
|
||
g.Go(func() error { | ||
buf := make([]byte, 65507) | ||
for { | ||
n, addr, err := conn.ReadFrom(buf) | ||
// We must handle n > 0 bytes before considering the error. | ||
// https://pkg.go.dev/net#PacketConn | ||
if n > 0 { | ||
msg := &api.TunnelMessage{ | ||
Id: id + "-" + addr.String(), | ||
Protocol: "udp", | ||
GuestAddr: guestAddr, | ||
Data: buf[:n], | ||
UdpTargetAddr: addr.String(), | ||
} | ||
if err := stream.Send(msg); err != nil { | ||
return err | ||
} | ||
} | ||
if err != nil { | ||
// https://pkg.go.dev/net#PacketConn does not mention io.EOF semantics. | ||
if errors.Is(err, io.EOF) { | ||
return nil | ||
} | ||
return err | ||
} | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
}) | ||
// Handshake message to start tunnel | ||
if err := stream.Send(&api.TunnelMessage{Id: id, Protocol: "udp", GuestAddr: guestAddr}); err != nil { | ||
logrus.Errorf("could not start udp tunnel for id: %s error:%v", id, err) | ||
return | ||
} | ||
|
||
g.Go(func() error { | ||
for { | ||
// Not documented: when err != nil, in is always nil. | ||
in, err := stream.Recv() | ||
if err != nil { | ||
if errors.Is(err, io.EOF) { | ||
return nil | ||
} | ||
return err | ||
} | ||
addr, err := net.ResolveUDPAddr("udp", in.UdpTargetAddr) | ||
if err != nil { | ||
return err | ||
} | ||
_, err = conn.WriteTo(in.Data, addr) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
proxy, err := forwarder.NewUDPProxy(conn, func() (net.Conn, error) { | ||
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "udp"} | ||
return rw, nil | ||
}) | ||
|
||
if err := g.Wait(); err != nil { | ||
logrus.Debugf("error in udp tunnel for id: %s error:%v", id, err) | ||
if err != nil { | ||
logrus.Errorf("error in udp tunnel proxy for id: %s error:%v", id, err) | ||
return | ||
} | ||
|
||
defer func() { | ||
err := proxy.Close() | ||
if err != nil { | ||
logrus.Errorf("error in closing udp tunnel proxy for id: %s error:%v", id, err) | ||
} | ||
}() | ||
proxy.Run() | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
type GrpcClientRW struct { | ||
id string | ||
addr string | ||
stream api.GuestService_TunnelClient | ||
id string | ||
addr string | ||
|
||
protocol string | ||
stream api.GuestService_TunnelClient | ||
} | ||
|
||
var _ io.ReadWriter = (*GrpcClientRW)(nil) | ||
var _ net.Conn = (*GrpcClientRW)(nil) | ||
|
||
func (g GrpcClientRW) Write(p []byte) (n int, err error) { | ||
if len(p) == 0 { | ||
return 0, nil | ||
} | ||
func (g *GrpcClientRW) Write(p []byte) (n int, err error) { | ||
err = g.stream.Send(&api.TunnelMessage{ | ||
Id: g.id, | ||
GuestAddr: g.addr, | ||
Data: p, | ||
Protocol: "tcp", | ||
Protocol: g.protocol, | ||
}) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return len(p), nil | ||
} | ||
|
||
func (g GrpcClientRW) Read(p []byte) (n int, err error) { | ||
// Not documented: when err != nil, in is always nil. | ||
func (g *GrpcClientRW) Read(p []byte) (n int, err error) { | ||
in, err := g.stream.Recv() | ||
if err != nil { | ||
if errors.Is(err, io.EOF) { | ||
return 0, nil | ||
} | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return 0, err | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
if len(in.Data) == 0 { | ||
return 0, nil | ||
} | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
copy(p, in.Data) | ||
return len(in.Data), nil | ||
} | ||
|
||
func (g *GrpcClientRW) Close() error { | ||
return g.stream.CloseSend() | ||
} | ||
|
||
func (g *GrpcClientRW) LocalAddr() net.Addr { | ||
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"} | ||
} | ||
|
||
func (g *GrpcClientRW) RemoteAddr() net.Addr { | ||
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"} | ||
} | ||
|
||
func (g *GrpcClientRW) SetDeadline(_ time.Time) error { | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we don't implement it? should we log a warning to make the issue visible? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Too much logging will become a noise since these are connections. It will be called multiple times and its doesn't help much. SetDeadline is not even usecul in our case as ours is a grpc stream There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or just panic. If these functions don't do what they're supposed to (and are never called) a panic would be proof of that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot panic on all cases. These are left as NO-OP wantedly. These methods may / will be called but its of no value to us as its a grpc tunnel wrapped conn There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no-op on SetDeadline is not a reasonable thing to do. It should return an error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tamird Please understand the full flow before commenting. All left like that for a reason. UDP Proxy internally will call SetReadDeadline so we cannot throw error. It will be no-op only. That's why all SetDeadline like methods are returning null. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think you are talking past each other. |
||
} | ||
|
||
func (g *GrpcClientRW) SetReadDeadline(_ time.Time) error { | ||
return nil | ||
} | ||
|
||
func (g *GrpcClientRW) SetWriteDeadline(_ time.Time) error { | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,66 +4,79 @@ import ( | |
"errors" | ||
"io" | ||
"net" | ||
"time" | ||
|
||
"github.com/lima-vm/lima/pkg/bicopy" | ||
"github.com/lima-vm/lima/pkg/guestagent/api" | ||
) | ||
|
||
type TunnelServer struct { | ||
Conns map[string]net.Conn | ||
} | ||
type TunnelServer struct{} | ||
|
||
func NewTunnelServer() *TunnelServer { | ||
return &TunnelServer{ | ||
Conns: make(map[string]net.Conn), | ||
} | ||
return &TunnelServer{} | ||
} | ||
|
||
func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error { | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for { | ||
in, err := stream.Recv() | ||
// Receive the handshake message to start tunnel | ||
in, err := stream.Recv() | ||
if err != nil { | ||
if errors.Is(err, io.EOF) { | ||
return nil | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
if len(in.Data) == 0 { | ||
continue | ||
} | ||
return err | ||
} | ||
balajiv113 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
conn, ok := s.Conns[in.Id] | ||
if !ok { | ||
conn, err = net.Dial(in.Protocol, in.GuestAddr) | ||
if err != nil { | ||
return err | ||
} | ||
s.Conns[in.Id] = conn | ||
|
||
writer := &GRPCServerWriter{id: in.Id, udpAddr: in.UdpTargetAddr, stream: stream} | ||
go func() { | ||
_, _ = io.Copy(writer, conn) | ||
delete(s.Conns, writer.id) | ||
}() | ||
} | ||
_, err = conn.Write(in.Data) | ||
if err != nil { | ||
return err | ||
} | ||
// We simply forward data form GRPC stream to net.Conn for both tcp and udp. So simple proxy is sufficient | ||
conn, err := net.Dial(in.Protocol, in.GuestAddr) | ||
if err != nil { | ||
return err | ||
} | ||
rw := &GRPCServerRW{stream: stream, id: in.Id} | ||
bicopy.Bicopy(rw, conn, nil) | ||
return nil | ||
} | ||
|
||
type GRPCServerWriter struct { | ||
id string | ||
udpAddr string | ||
stream api.GuestService_TunnelServer | ||
type GRPCServerRW struct { | ||
id string | ||
stream api.GuestService_TunnelServer | ||
} | ||
|
||
var _ io.Writer = (*GRPCServerWriter)(nil) | ||
var _ net.Conn = (*GRPCServerRW)(nil) | ||
|
||
func (g GRPCServerWriter) Write(p []byte) (n int, err error) { | ||
if len(p) == 0 { | ||
return 0, nil | ||
} | ||
err = g.stream.Send(&api.TunnelMessage{Id: g.id, Data: p, UdpTargetAddr: g.udpAddr}) | ||
func (g *GRPCServerRW) Write(p []byte) (n int, err error) { | ||
err = g.stream.Send(&api.TunnelMessage{Id: g.id, Data: p}) | ||
return len(p), err | ||
} | ||
|
||
func (g *GRPCServerRW) Read(p []byte) (n int, err error) { | ||
in, err := g.stream.Recv() | ||
if err != nil { | ||
return 0, err | ||
} | ||
copy(p, in.Data) | ||
return len(in.Data), nil | ||
} | ||
|
||
func (g *GRPCServerRW) Close() error { | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this does nothing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Server doesn't have any stream closing methods. So it doesn't do anything There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this is implemented only to satisfy the net.Conn interface? Should we log a debug message when about ignoring Close? This code can be integrated in other code expecting that Close() does something. If doing nothing breaks something, the debug log will help to diagnose the issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. debug logs are useful only in dev side, also grpc server doesn't provide close, its purely handled on client itself. I don't think adding debug is anywhere useful. I don't want to spam it in our guestagent logs which doesn't give much value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning nil on |
||
} | ||
|
||
func (g *GRPCServerRW) LocalAddr() net.Addr { | ||
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like the right way to support good logging, better than the NamedReadWriter added in #2944. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a pointless comment. The purpose of This implementation here is a lie, it doesn't identify the connection and the address it returns cannot be used in any of the normal ways that |
||
} | ||
|
||
func (g *GRPCServerRW) RemoteAddr() net.Addr { | ||
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"} | ||
} | ||
|
||
func (g *GRPCServerRW) SetDeadline(_ time.Time) error { | ||
return nil | ||
} | ||
|
||
func (g *GRPCServerRW) SetReadDeadline(_ time.Time) error { | ||
return nil | ||
} | ||
|
||
func (g *GRPCServerRW) SetWriteDeadline(_ time.Time) error { | ||
return nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.