-
Notifications
You must be signed in to change notification settings - Fork 669
cleanup: refactor and use bicopy everywhere #2944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,3 @@ | ||
// From https://raw.githubusercontent.com/norouter/norouter/v0.6.5/pkg/agent/bicopy/bicopy.go | ||
/* | ||
Copyright (C) NoRouter authors. | ||
|
||
Copyright (C) libnetwork authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package bicopy | ||
|
||
import ( | ||
|
@@ -26,56 +7,25 @@ import ( | |
"github.com/sirupsen/logrus" | ||
) | ||
|
||
// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104 | ||
// (originally from libnetwork, Apache License 2.0). | ||
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) { | ||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
|
||
type closeReader interface { | ||
CloseRead() error | ||
} | ||
type closeWriter interface { | ||
CloseWrite() error | ||
} | ||
var wg sync.WaitGroup | ||
broker := func(to, from io.ReadWriter) { | ||
if _, err := io.Copy(to, from); err != nil { | ||
logrus.WithError(err).Debug("failed to call io.Copy") | ||
} | ||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if fromCR, ok := from.(closeReader); ok { | ||
if err := fromCR.CloseRead(); err != nil { | ||
logrus.WithError(err).Debug("failed to call CloseRead") | ||
} | ||
} | ||
if toCW, ok := to.(closeWriter); ok { | ||
if err := toCW.CloseWrite(); err != nil { | ||
logrus.WithError(err).Debug("failed to call CloseWrite") | ||
} | ||
} | ||
wg.Done() | ||
} | ||
type NamedReadWriter struct { | ||
ReadWriter io.ReadWriter | ||
Name string | ||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func Bicopy(context string, x, y NamedReadWriter) { | ||
var wg sync.WaitGroup | ||
wg.Add(2) | ||
go broker(x, y) | ||
go broker(y, x) | ||
finish := make(chan struct{}) | ||
go func() { | ||
wg.Wait() | ||
close(finish) | ||
}() | ||
|
||
select { | ||
case <-quit: | ||
case <-finish: | ||
} | ||
if xCloser, ok := x.(io.Closer); ok { | ||
if err := xCloser.Close(); err != nil { | ||
logrus.WithError(err).Debug("failed to call xCloser.Close") | ||
defer wg.Done() | ||
if _, err := io.Copy(x.ReadWriter, y.ReadWriter); err != nil { | ||
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, x.Name, y.Name) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logs an error like:
For code already calling bicopy, this is an improvement, since before we logged a DEBUG message without any information on which direction failed, or what is the purpose of the copy:
But for the call from network_darwin this is worse. Before we had this clear error:
I would like to keep the previous error message or something very close. I don't see the value of logging the error separately using logrous.WithError(). It can be done using: logrus.Errorf("Failed to %s from %s to %s: %s", context, y.Name, x.Name, err) |
||
} | ||
if yCloser, ok := y.(io.Closer); ok { | ||
if err := yCloser.Close(); err != nil { | ||
logrus.WithError(err).Debug("failed to call yCloser.Close") | ||
}() | ||
go func() { | ||
defer wg.Done() | ||
if _, err := io.Copy(y.ReadWriter, x.ReadWriter); err != nil { | ||
logrus.WithError(err).Errorf("%s: io.Copy(%s, %s)", context, y.Name, x.Name) | ||
} | ||
} | ||
<-finish | ||
// TODO: return copied bytes | ||
}() | ||
wg.Wait() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need bicopy to handle the error and not throw it to the caller ?? I would prefer it to be handled by the caller itself. It gives us better control. Handling error ourself looks generic but all we are doing is just logging. But in some cases we may need to act on error in caller side There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There has been a lot of back and forth on this already. I'm inclined to leave it as is. Another option is to pass a handler callback, but nobody needs this today. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If you have a valid reason for not returning error please let me know. Also i think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nirs expressed concern that if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will eventually fail because one end of stream is closed already. Since we have wg.Done at last. Returning error after both io.Copy are closed looks correct to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With errgroup as soon as one is returned non-nil its over. It will stop the other go func as well. If we are using reader/writer then we will not even have close so it's just stopping the loop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The goroutine is canceled magically? how does it stop? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see anything in the docs about cancelling goroutine blocked on a syscall. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes you are right, My mistake. Only context is closed. So we need to check for context close and close the connection. For pure reader / writer until that syscall is returned we have no other option. But most of our reader / writer have close method so io.Closable should be fine There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should also be added to tcpproxy upstream - ensure termination and return an error. With both we can use it instead of Bicopy. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -134,23 +134,26 @@ func (plf *pseudoLoopbackForwarder) Serve() error { | |
ac.Close() | ||
continue | ||
} | ||
go func(ac *net.TCPConn) { | ||
if fErr := plf.forward(ac); fErr != nil { | ||
logrus.Error(fErr) | ||
} | ||
}(ac) | ||
go plf.forward(ac) | ||
} | ||
} | ||
|
||
func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) error { | ||
func (plf *pseudoLoopbackForwarder) forward(ac *net.TCPConn) { | ||
defer ac.Close() | ||
unixConn, err := net.DialUnix("unix", nil, plf.unixAddr) | ||
if err != nil { | ||
return err | ||
logrus.WithError(err).Errorf("pseudoloopback forwarder: failed to dial %q", plf.unixAddr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error adds context (good) but logs the error separately (bad). Before we logged only the error from net.DialUnix (line 139). |
||
return | ||
} | ||
defer unixConn.Close() | ||
bicopy.Bicopy(ac, unixConn, nil) | ||
return nil | ||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
bicopy.Bicopy("pseudoloopback forwarder", bicopy.NamedReadWriter{ | ||
ReadWriter: ac, | ||
Name: "tcp", | ||
}, bicopy.NamedReadWriter{ | ||
ReadWriter: unixConn, | ||
Name: "unix", | ||
}) | ||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The context does not match other context ("pseudoloopback fowarder" vs "forward packets") and the names of the readwriters are little bit too generic, I'm not sure what is the context on this code, may be others can suggest better values. |
||
} | ||
|
||
func (plf *pseudoLoopbackForwarder) Close() error { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,14 @@ func HandleTCPConnection(ctx context.Context, client *guestagentclient.GuestAgen | |
} | ||
|
||
rw := &GrpcClientRW{stream: stream, id: id, addr: guestAddr, protocol: "tcp"} | ||
bicopy.Bicopy(rw, conn, nil) | ||
|
||
bicopy.Bicopy("tcp tunnel", bicopy.NamedReadWriter{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the tunnel for tcp port forwarding, so maybe a better context would be "forward tcp packets for port N". We probably can get the port from conn.RemoteAddr(), since we forward port N to port N on the host. |
||
ReadWriter: rw, | ||
Name: guestAddr, | ||
}, bicopy.NamedReadWriter{ | ||
ReadWriter: conn, | ||
Name: id, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. id is a description of the entire port forwading action: "tcp-%s-%s" (see line 17), so it is not a good for the conn. |
||
}) | ||
tamird marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func HandleUDPConnection(ctx context.Context, client *guestagentclient.GuestAgentClient, conn net.PacketConn, guestAddr string) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,14 @@ func (s *TunnelServer) Start(stream api.GuestService_TunnelServer) error { | |
return err | ||
} | ||
rw := &GRPCServerRW{stream: stream, id: in.Id} | ||
bicopy.Bicopy(rw, conn, nil) | ||
|
||
bicopy.Bicopy("tcp tunnel", bicopy.NamedReadWriter{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can probably use the same context used in the client with the guest port. |
||
ReadWriter: rw, | ||
Name: in.Id, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what Id is. Can you share the output we get in runtime? |
||
}, bicopy.NamedReadWriter{ | ||
ReadWriter: conn, | ||
Name: in.GuestAddr, | ||
}) | ||
return nil | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.