Skip to content

Commit 34ec468

Browse files
committed
Revamp GRPC Port forwarding tunnels to use existing proxy
Signed-off-by: Balaji Vijayakumar <kuttibalaji.v6@gmail.com>
1 parent 5640561 commit 34ec468

File tree

2 files changed

+117
-119
lines changed

2 files changed

+117
-119
lines changed

pkg/portfwd/client.go

Lines changed: 61 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,18 @@ package portfwd
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"io"
86
"net"
7+
"time"
98

9+
"github.com/containers/gvisor-tap-vsock/pkg/services/forwarder"
10+
"github.com/containers/gvisor-tap-vsock/pkg/tcpproxy"
1011
"github.com/lima-vm/lima/pkg/guestagent/api"
1112
guestagentclient "github.com/lima-vm/lima/pkg/guestagent/api/client"
1213
"github.com/sirupsen/logrus"
13-
"golang.org/x/sync/errgroup"
1414
)
1515

1616
func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.Conn, guestAddr string) {
17-
defer conn.Close()
18-
1917
id := fmt.Sprintf("tcp-%s-%s", conn.LocalAddr().String(), conn.RemoteAddr().String())
2018

2119
stream, err := client.Tunnel(ctx)
@@ -24,26 +22,23 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
2422
return
2523
}
2624

27-
g, _ := errgroup.WithContext(ctx)
28-
29-
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr}
30-
g.Go(func() error {
31-
_, err := io.Copy(rw, conn)
32-
return err
33-
})
34-
g.Go(func() error {
35-
_, err := io.Copy(conn, rw)
36-
return err
37-
})
25+
// Handshake message to start tunnel
26+
err = stream.Send(&api.TunnelMessage{Id: id, Protocol: "tcp", GuestAddr: guestAddr})
27+
if err != nil {
28+
logrus.Errorf("could not start tcp tunnel for id: %s error:%v", id, err)
29+
return
30+
}
3831

39-
if err := g.Wait(); err != nil {
40-
logrus.Debugf("error in tcp tunnel for id: %s error:%v", id, err)
32+
remote := tcpproxy.DialProxy{
33+
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
34+
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "tcp"}
35+
return rw, nil
36+
},
4137
}
38+
remote.HandleConn(conn)
4239
}
4340

4441
func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) {
45-
defer conn.Close()
46-
4742
id := fmt.Sprintf("udp-%s", conn.LocalAddr().String())
4843

4944
stream, err := client.Tunnel(ctx)
@@ -52,74 +47,43 @@ func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgen
5247
return
5348
}
5449

55-
g, _ := errgroup.WithContext(ctx)
56-
57-
g.Go(func() error {
58-
buf := make([]byte, 65507)
59-
for {
60-
n, addr, err := conn.ReadFrom(buf)
61-
if err != nil {
62-
// https://pkg.go.dev/net#PacketConn does not mention io.EOF semantics.
63-
if errors.Is(err, io.EOF) {
64-
return nil
65-
}
66-
return err
67-
}
68-
msg := &api.TunnelMessage{
69-
Id: id + "-" + addr.String(),
70-
Protocol: "udp",
71-
GuestAddr: guestAddr,
72-
Data: buf[:n],
73-
UdpTargetAddr: addr.String(),
74-
}
75-
if err := stream.Send(msg); err != nil {
76-
return err
77-
}
78-
}
79-
})
50+
//Handshake message to start tunnel
51+
err = stream.Send(&api.TunnelMessage{Id: id, Protocol: "udp", GuestAddr: guestAddr})
52+
if err != nil {
53+
logrus.Errorf("could not start udp tunnel for id: %s error:%v", id, err)
54+
return
55+
}
8056

81-
g.Go(func() error {
82-
for {
83-
in, err := stream.Recv()
84-
if err != nil {
85-
if errors.Is(err, io.EOF) {
86-
return nil
87-
}
88-
return err
89-
}
90-
addr, err := net.ResolveUDPAddr("udp", in.UdpTargetAddr)
91-
if err != nil {
92-
return err
93-
}
94-
_, err = conn.WriteTo(in.Data, addr)
95-
if err != nil {
96-
return err
97-
}
98-
}
57+
proxy, err := forwarder.NewUDPProxy(conn, func() (net.Conn, error) {
58+
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "udp"}
59+
return rw, nil
9960
})
10061

101-
if err := g.Wait(); err != nil {
102-
logrus.Debugf("error in udp tunnel for id: %s error:%v", id, err)
62+
if err != nil {
63+
logrus.Errorf("error in udp tunnel proxy for id: %s error:%v", id, err)
64+
return
10365
}
66+
67+
defer proxy.Close()
68+
proxy.Run()
10469
}
10570

10671
type GrpcClientRW struct {
107-
id string
108-
addr string
109-
stream api.GuestService_TunnelClient
72+
id string
73+
addr string
74+
75+
protocol string
76+
stream api.GuestService_TunnelClient
11077
}
11178

112-
var _ io.ReadWriter = (*GrpcClientRW)(nil)
79+
var _ net.Conn = (*GrpcClientRW)(nil)
11380

11481
func (g GrpcClientRW) Write(p []byte) (n int, err error) {
115-
if len(p) == 0 {
116-
return 0, nil
117-
}
11882
err = g.stream.Send(&api.TunnelMessage{
11983
Id: g.id,
12084
GuestAddr: g.addr,
12185
Data: p,
122-
Protocol: "tcp",
86+
Protocol: g.protocol,
12387
})
12488
if err != nil {
12589
return 0, err
@@ -130,14 +94,32 @@ func (g GrpcClientRW) Write(p []byte) (n int, err error) {
13094
func (g GrpcClientRW) Read(p []byte) (n int, err error) {
13195
in, err := g.stream.Recv()
13296
if err != nil {
133-
if errors.Is(err, io.EOF) {
134-
return 0, nil
135-
}
13697
return 0, err
13798
}
138-
if len(in.Data) == 0 {
139-
return 0, nil
140-
}
14199
copy(p, in.Data)
142100
return len(in.Data), nil
143101
}
102+
103+
func (g GrpcClientRW) Close() error {
104+
return g.stream.CloseSend()
105+
}
106+
107+
func (g GrpcClientRW) LocalAddr() net.Addr {
108+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
109+
}
110+
111+
func (g GrpcClientRW) RemoteAddr() net.Addr {
112+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
113+
}
114+
115+
func (g GrpcClientRW) SetDeadline(_ time.Time) error {
116+
return nil
117+
}
118+
119+
func (g GrpcClientRW) SetReadDeadline(_ time.Time) error {
120+
return nil
121+
}
122+
123+
func (g GrpcClientRW) SetWriteDeadline(_ time.Time) error {
124+
return nil
125+
}

pkg/portfwdserver/server.go

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,85 @@
11
package portfwdserver
22

33
import (
4+
"context"
45
"errors"
56
"io"
67
"net"
8+
"time"
79

10+
"github.com/containers/gvisor-tap-vsock/pkg/tcpproxy"
811
"github.com/lima-vm/lima/pkg/guestagent/api"
912
)
1013

1114
type TunnelServer struct {
12-
Conns map[string]net.Conn
1315
}
1416

1517
func NewTunnelServer() *TunnelServer {
16-
return &TunnelServer{
17-
Conns: make(map[string]net.Conn),
18-
}
18+
return &TunnelServer{}
1919
}
2020

2121
func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
22-
for {
23-
in, err := stream.Recv()
22+
// Receive the handshake message to start tunnel
23+
in, err := stream.Recv()
24+
if err != nil {
2425
if errors.Is(err, io.EOF) {
2526
return nil
2627
}
27-
if err != nil {
28-
return err
29-
}
30-
if len(in.Data) == 0 {
31-
continue
32-
}
28+
return err
29+
}
3330

34-
conn, ok := s.Conns[in.Id]
35-
if !ok {
36-
conn, err = net.Dial(in.Protocol, in.GuestAddr)
37-
if err != nil {
38-
return err
39-
}
40-
s.Conns[in.Id] = conn
41-
42-
writer := &GRPCServerWriter{id: in.Id, udpAddr: in.UdpTargetAddr, stream: stream}
43-
go func() {
44-
_, _ = io.Copy(writer, conn)
45-
delete(s.Conns, writer.id)
46-
}()
47-
}
48-
_, err = conn.Write(in.Data)
49-
if err != nil {
50-
return err
51-
}
31+
// We simply forward data form GRPC stream to net.Conn for both tcp and udp. So simple tcpproxy is sufficient
32+
rw := &GRPCServerRW{stream: stream, id: in.Id}
33+
remote := tcpproxy.DialProxy{
34+
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
35+
return net.Dial(in.Protocol, in.GuestAddr)
36+
},
5237
}
38+
remote.HandleConn(rw)
39+
return nil
5340
}
5441

55-
type GRPCServerWriter struct {
56-
id string
57-
udpAddr string
58-
stream api.GuestService_TunnelServer
42+
type GRPCServerRW struct {
43+
id string
44+
stream api.GuestService_TunnelServer
5945
}
6046

61-
var _ io.Writer = (*GRPCServerWriter)(nil)
47+
var _ net.Conn = (*GRPCServerRW)(nil)
6248

63-
func (g GRPCServerWriter) Write(p []byte) (n int, err error) {
64-
if len(p) == 0 {
65-
return 0, nil
66-
}
67-
err = g.stream.Send(&api.TunnelMessage{Id: g.id, Data: p, UdpTargetAddr: g.udpAddr})
49+
func (g GRPCServerRW) Write(p []byte) (n int, err error) {
50+
err = g.stream.Send(&api.TunnelMessage{Id: g.id, Data: p})
6851
return len(p), err
6952
}
53+
54+
func (g GRPCServerRW) Read(p []byte) (n int, err error) {
55+
in, err := g.stream.Recv()
56+
if err != nil {
57+
return 0, err
58+
}
59+
copy(p, in.Data)
60+
return len(in.Data), nil
61+
}
62+
63+
func (g GRPCServerRW) Close() error {
64+
return nil
65+
}
66+
67+
func (g GRPCServerRW) LocalAddr() net.Addr {
68+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
69+
}
70+
71+
func (g GRPCServerRW) RemoteAddr() net.Addr {
72+
return &net.UnixAddr{Name: "grpc", Net: "unixpacket"}
73+
}
74+
75+
func (g GRPCServerRW) SetDeadline(_ time.Time) error {
76+
return nil
77+
}
78+
79+
func (g GRPCServerRW) SetReadDeadline(_ time.Time) error {
80+
return nil
81+
}
82+
83+
func (g GRPCServerRW) SetWriteDeadline(_ time.Time) error {
84+
return nil
85+
}

0 commit comments

Comments
 (0)