Skip to content

cleanup: refactor and use bicopy everywhere #2944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 16 additions & 66 deletions pkg/bicopy/bicopy.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go
/*
Copyright (C) NoRouter authors.

Copyright (C) libnetwork authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bicopy

import (
Expand All @@ -26,56 +7,25 @@ import (
"github.com/sirupsen/logrus"
)

// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104
// (originally from libnetwork, Apache License 2.0).
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) {
type closeReader interface {
CloseRead() error
}
type closeWriter interface {
CloseWrite() error
}
var wg sync.WaitGroup
broker := func(to, from io.ReadWriter) {
if _, err := io.Copy(to, from); err != nil {
logrus.WithError(err).Debug("failed to call io.Copy")
}
if fromCR, ok := from.(closeReader); ok {
if err := fromCR.CloseRead(); err != nil {
logrus.WithError(err).Debug("failed to call CloseRead")
}
}
if toCW, ok := to.(closeWriter); ok {
if err := toCW.CloseWrite(); err != nil {
logrus.WithError(err).Debug("failed to call CloseWrite")
}
}
wg.Done()
}
type NamedReadWriter struct {
ReadWriter io.ReadWriter
Name string
}

func Bicopy(context string, x, y NamedReadWriter) {
var wg sync.WaitGroup
wg.Add(2)
go broker(x, y)
go broker(y, x)
finish := make(chan struct{})
go func() {
wg.Wait()
close(finish)
}()

select {
case <-quit:
case <-finish:
}
if xCloser, ok := x.(io.Closer); ok {
if err := xCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call xCloser.Close")
defer wg.Done()
if _, err := io.Copy(x.ReadWriter, y.ReadWriter); err != nil {
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, x.Name, y.Name)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logs an error like:

"forwarding packets: io.Copy(VMNENT, VZ)" ... {"error": "actual error from io.Copy"}

For code already calling bicopy, this is an improvement, since before we logged a DEBUG message without any information on which direction failed, or what is the purpose of the copy:

"failed to call io.Copy" ... ... {"error": "actual error from io.Copy"}

But for the call from network_darwin this is worse. Before we had this clear error:

"Failed to forward packets from VZ to VMNET: actual error from io.Copy"

I would like to keep the previous error message or something very close. I don't see the value of logging the error separately using logrous.WithError().

It can be done using:

logrus.Errorf("Failed to %s from %s to %s: %s", context, y.Name, x.Name, err)

}
if yCloser, ok := y.(io.Closer); ok {
if err := yCloser.Close(); err != nil {
logrus.WithError(err).Debug("failed to call yCloser.Close")
}()
go func() {
defer wg.Done()
if _, err := io.Copy(y.ReadWriter, x.ReadWriter); err != nil {
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, y.Name, x.Name)
}
}
<-finish
// TODO: return copied bytes
}()
wg.Wait()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need bicopy to handle the error and not throw it to the caller ??

I would prefer it to be handled by the caller itself. It gives us better control. Handling error ourself looks generic but all we are doing is just logging. But in some cases we may need to act on error in caller side

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has been a lot of back and forth on this already. I'm inclined to leave it as is. Another option is to pass a handler callback, but nobody needs this today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but nobody needs this today - If errors are returned back, callers will have more control to handle logging parts. Some cases might need more detailed logging rather than generic message context: io.Copy(y, x) which is not self explanatory.

If you have a valid reason for not returning error please let me know.

Also i think func Bicopy(context string, x, y NamedReadWriter) error {} should be good as well. We don't need to complicate with handlers / callbacks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirs expressed concern that if io.Copy in one direction returns, the io.Copy in the other direction might not - and returning an error cannot happen until both functions complete. That's why the latest versions logs directly instead of returning an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will eventually fail because one end of stream is closed already.

Since we have wg.Done at last. Returning error after both io.Copy are closed looks correct to me.

Copy link
Member

@balajiv113 balajiv113 Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With errgroup as soon as one is returned non-nil its over. It will stop the other go func as well.

If we are using reader/writer then we will not even have close so it's just stopping the loop

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goroutine is canceled magically? how does it stop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything in the docs about cancelling goroutine blocked on a syscall.
https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, My mistake.

Only context is closed. So we need to check for context close and close the connection.

For pure reader / writer until that syscall is returned we have no other option. But most of our reader / writer have close method so io.Closable should be fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also be added to tcpproxy upstream - ensure termination and return an error. With both we can use it instead of Bicopy.

21 changes: 12 additions & 9 deletions pkg/hostagent/port_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,23 +134,26 @@ func (plf *pseudoLoopbackForwarder) Serve() error {
ac.Close()
continue
}
go func(ac *net.TCPConn) {
if fErr := plf.forward(ac); fErr != nil {
logrus.Error(fErr)
}
}(ac)
go plf.forward(ac)
}
}

func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error {
func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) {
defer ac.Close()
unixConn, err := net.DialUnix("unix", nil, plf.unixAddr)
if err != nil {
return err
logrus.WithError(err).Errorf("pseudoloopback forwarder: failed to dial %q", plf.unixAddr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error adds context (good) but logs the error separately (bad). Before we logged only the error from net.DialUnix (line 139).

return
}
defer unixConn.Close()
bicopy.Bicopy(ac, unixConn, nil)
return nil

bicopy.Bicopy("pseudoloopback forwarder", bicopy.NamedReadWriter{
ReadWriter: ac,
Name: "tcp",
}, bicopy.NamedReadWriter{
ReadWriter: unixConn,
Name: "unix",
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context does not match other context ("pseudoloopback fowarder" vs "forward packets") and the names of the readwriters are little bit too generic, I'm not sure what is the context on this code, may be others can suggest better values.

}

func (plf *pseudoLoopbackForwarder) Close() error {
Expand Down
9 changes: 8 additions & 1 deletion pkg/portfwd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen
}

rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "tcp"}
bicopy.Bicopy(rw, conn, nil)

bicopy.Bicopy("tcp tunnel", bicopy.NamedReadWriter{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the tunnel for tcp port forwarding, so maybe a better context would be "forward tcp packets for port N". We probably can get the port from conn.RemoteAddr(), since we forward port N to port N on the host.

ReadWriter: rw,
Name: guestAddr,
}, bicopy.NamedReadWriter{
ReadWriter: conn,
Name: id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id is a description of the entire port forwading action: "tcp-%s-%s" (see line 17), so it is not a good for the conn.

})
}

func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) {
Expand Down
9 changes: 8 additions & 1 deletion pkg/portfwdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error {
return err
}
rw := &GRPCServerRW{stream: stream, id: in.Id}
bicopy.Bicopy(rw, conn, nil)

bicopy.Bicopy("tcp tunnel", bicopy.NamedReadWriter{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably use the same context used in the client with the guest port.

ReadWriter: rw,
Name: in.Id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what Id is. Can you share the output we get in runtime?

}, bicopy.NamedReadWriter{
ReadWriter: conn,
Name: in.GuestAddr,
})
return nil
}

Expand Down
27 changes: 8 additions & 19 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"
"github.com/lima-vm/lima/pkg/bicopy"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -66,24 +66,13 @@ func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
bicopy.Bicopy("forwarding packets", bicopy.NamedReadWriter{
ReadWriter: qemuConn,
Name: "VMNET",
}, bicopy.NamedReadWriter{
ReadWriter: vzConn,
Name: "VZ",
})
}

// qemuPacketConn converts raw network packet to a QEMU supported network packet.
Expand Down
Loading