From 62be36e4ced70f78e7a787a66adc3d445159d287 Mon Sep 17 00:00:00 2001 From: "mr.The" Date: Thu, 26 May 2022 20:01:50 +0300 Subject: [PATCH] Tunnels refactoring and datarace fixes Golang documentation mentioning it's unsafe to concurently send messages via same tunnel, so there is a mutex and a data copy on write --- client/command/exec/execute-shellcode.go | 2 +- client/command/shell/shell.go | 2 +- client/core/portfwd.go | 10 +- client/core/tunnel.go | 115 ++------------------- client/core/tunnel_io.go | 115 +++++++++++++++++++++ client/core/tunnels.go | 124 +++++++++++++++++++++++ server/core/tunnels.go | 1 + 7 files changed, 255 insertions(+), 114 deletions(-) create mode 100644 client/core/tunnel_io.go create mode 100644 client/core/tunnels.go diff --git a/client/command/exec/execute-shellcode.go b/client/command/exec/execute-shellcode.go index 8e9b8f9d52..67629f3c91 100644 --- a/client/command/exec/execute-shellcode.go +++ b/client/command/exec/execute-shellcode.go @@ -125,7 +125,7 @@ func executeInteractive(ctx *grumble.Context, hostProc string, shellcode []byte, return } - tunnel := core.Tunnels.Start(rpcTunnel.GetTunnelID(), rpcTunnel.GetSessionID()) + tunnel := core.GetTunnels().Start(rpcTunnel.GetTunnelID(), rpcTunnel.GetSessionID()) shell, err := con.Rpc.Shell(context.Background(), &sliverpb.ShellReq{ Request: con.ActiveTarget.Request(ctx), diff --git a/client/command/shell/shell.go b/client/command/shell/shell.go index 5699066ede..e92bf03cc7 100644 --- a/client/command/shell/shell.go +++ b/client/command/shell/shell.go @@ -78,7 +78,7 @@ func runInteractive(ctx *grumble.Context, shellPath string, noPty bool, con *con log.Printf("Created new tunnel with id: %d, binding to shell ...", rpcTunnel.TunnelID) // Start() takes an RPC tunnel and creates a local Reader/Writer tunnel object - tunnel := core.Tunnels.Start(rpcTunnel.TunnelID, rpcTunnel.SessionID) + tunnel := core.GetTunnels().Start(rpcTunnel.TunnelID, rpcTunnel.SessionID) shell, err := con.Rpc.Shell(context.Background(), &sliverpb.ShellReq{ Request: con.ActiveTarget.Request(ctx), diff --git a/client/core/portfwd.go b/client/core/portfwd.go index aa3d2e9f4f..1db764874b 100644 --- a/client/core/portfwd.go +++ b/client/core/portfwd.go @@ -125,7 +125,7 @@ func (p *ChannelProxy) HandleConn(conn net.Conn) { // Cleanup defer func() { go conn.Close() - Tunnels.Close(tunnel.ID) + GetTunnels().Close(tunnel.ID) }() errs := make(chan error, 1) @@ -172,7 +172,7 @@ func (p *ChannelProxy) Host() string { return host } -func (p *ChannelProxy) dialImplant(ctx context.Context) (*Tunnel, error) { +func (p *ChannelProxy) dialImplant(ctx context.Context) (*TunnelIO, error) { log.Printf("[tcpproxy] Dialing implant to create tunnel ...") @@ -185,7 +185,7 @@ func (p *ChannelProxy) dialImplant(ctx context.Context) (*Tunnel, error) { return nil, err } log.Printf("[tcpproxy] Created new tunnel with id %d (session %s)", rpcTunnel.TunnelID, p.Session.ID) - tunnel := Tunnels.Start(rpcTunnel.TunnelID, rpcTunnel.SessionID) + tunnel := GetTunnels().Start(rpcTunnel.TunnelID, rpcTunnel.SessionID) log.Printf("[tcpproxy] Binding tunnel to portfwd %d", p.Port()) portfwdResp, err := p.Rpc.Portfwd(ctx, &sliverpb.PortfwdReq{ @@ -219,7 +219,7 @@ func (p *ChannelProxy) dialTimeout() time.Duration { return 30 * time.Second } -func toImplantLoop(conn net.Conn, tunnel *Tunnel, errs chan<- error) { +func toImplantLoop(conn net.Conn, tunnel *TunnelIO, errs chan<- error) { if wc, ok := conn.(*tcpproxy.Conn); ok && len(wc.Peeked) > 0 { if _, err := tunnel.Write(wc.Peeked); err != nil { errs <- err @@ -232,7 +232,7 @@ func toImplantLoop(conn net.Conn, tunnel *Tunnel, errs chan<- error) { errs <- err } -func fromImplantLoop(conn net.Conn, tunnel *Tunnel, errs chan<- error) { +func fromImplantLoop(conn net.Conn, tunnel *TunnelIO, errs chan<- error) { n, err := io.Copy(conn, tunnel) log.Printf("[tcpproxy] Closing from-implant after %d byte(s)", n) errs <- err diff --git a/client/core/tunnel.go b/client/core/tunnel.go index 58a7aab74b..af3d2c4b8f 100644 --- a/client/core/tunnel.go +++ b/client/core/tunnel.go @@ -1,127 +1,28 @@ package core import ( - "bytes" "context" "io" "log" - "sync" "github.com/bishopfox/sliver/protobuf/rpcpb" - "github.com/bishopfox/sliver/protobuf/sliverpb" ) -var ( - // Tunnels - Holds refs to all tunnels - Tunnels tunnels -) - -// Holds the tunnels locally so we can map incoming data -// messages to the tunnel -type tunnels struct { - tunnels *map[uint64]*Tunnel - mutex *sync.RWMutex - stream rpcpb.SliverRPC_TunnelDataClient -} - -// Get - Get a tunnel -func (t *tunnels) Get(tunnelID uint64) *Tunnel { - t.mutex.RLock() - defer t.mutex.RUnlock() - return (*t.tunnels)[tunnelID] -} - -// Start - Add a tunnel to the core mapper -func (t *tunnels) Start(tunnelID uint64, sessionID string) *Tunnel { - t.mutex.Lock() - defer t.mutex.Unlock() - tunnel := &Tunnel{ - ID: tunnelID, - SessionID: sessionID, - Send: make(chan []byte), - Recv: make(chan []byte), - } - (*t.tunnels)[tunnelID] = tunnel - go func() { - tunnel.IsOpen = true - for data := range tunnel.Send { - log.Printf("Send %d bytes on tunnel %d", len(data), tunnel.ID) - t.stream.Send(&sliverpb.TunnelData{ - TunnelID: tunnel.ID, - SessionID: tunnel.SessionID, - Data: data, - }) - } - }() - tunnel.Send <- make([]byte, 0) // Send "zero" message to bind client to tunnel - return tunnel -} - -// Close - Close the tunnel channels -func (t *tunnels) Close(tunnelID uint64) { - log.Printf("Closing tunnel %d", tunnelID) - t.mutex.Lock() - defer t.mutex.Unlock() - tunnel := (*t.tunnels)[tunnelID] - if tunnel != nil { - delete((*t.tunnels), tunnelID) - tunnel.IsOpen = false - close(tunnel.Recv) - close(tunnel.Send) - } -} - -// Tunnel - Duplex data tunnel -type Tunnel struct { - ID uint64 - IsOpen bool - SessionID string - - Send chan []byte - Recv chan []byte -} - -// Write - Writer method for interface -func (tun *Tunnel) Write(data []byte) (int, error) { - log.Printf("Write %d bytes", len(data)) - if !tun.IsOpen { - return 0, io.EOF - } - tun.Send <- data - n := len(data) - return n, nil -} - -// Read - Reader method for interface -func (tun *Tunnel) Read(data []byte) (int, error) { - var buff bytes.Buffer - if !tun.IsOpen { - log.Printf("Warning: Read on closed tunnel %d", tun.ID) - return 0, io.EOF - } - recvData := <-tun.Recv - log.Printf("Read %d bytes", len(recvData)) - buff.Write(recvData) - n := copy(data, buff.Bytes()) - return n, nil -} - // TunnelLoop - Parses incoming tunnel messages and distributes them // to session/tunnel objects +// Expected to be called only once during initialization func TunnelLoop(rpc rpcpb.SliverRPCClient) error { log.Println("Starting tunnel data loop ...") defer log.Printf("Warning: TunnelLoop exited") + stream, err := rpc.TunnelData(context.Background()) if err != nil { return err } - Tunnels = tunnels{ - tunnels: &map[uint64]*Tunnel{}, - mutex: &sync.RWMutex{}, - stream: stream, - } - for { + GetTunnels().SetStream(stream) + + for { // log.Printf("Waiting for TunnelData ...") incoming, err := stream.Recv() // log.Printf("Recv stream msg: %v", incoming) @@ -134,15 +35,15 @@ func TunnelLoop(rpc rpcpb.SliverRPCClient) error { return err } // log.Printf("Received TunnelData for tunnel %d", incoming.TunnelID) - tunnel := Tunnels.Get(incoming.TunnelID) + tunnel := GetTunnels().Get(incoming.TunnelID) + if tunnel != nil { if !incoming.Closed { log.Printf("Received data on tunnel %d", tunnel.ID) tunnel.Recv <- incoming.GetData() } else { log.Printf("Closing tunnel %d", tunnel.ID) - tunnel.IsOpen = false - close(tunnel.Recv) + GetTunnels().Close(tunnel.ID) } } else { log.Printf("Received tunnel data for non-existent tunnel id %d", incoming.TunnelID) diff --git a/client/core/tunnel_io.go b/client/core/tunnel_io.go new file mode 100644 index 0000000000..b611963c82 --- /dev/null +++ b/client/core/tunnel_io.go @@ -0,0 +1,115 @@ +package core + +import ( + "bytes" + "errors" + "io" + "log" + "sync" +) + +// TunnelIO - Duplex data tunnel, compatible with both io.ReadWriter +type TunnelIO struct { + ID uint64 + SessionID string + + Send chan []byte + Recv chan []byte + + isOpen bool + mutex *sync.RWMutex +} + +// NewTunnelIO - Single entry point for creating instance of new TunnelIO +func NewTunnelIO(tunnelID uint64, sessionID string) *TunnelIO { + log.Printf("New tunnel!: %d", tunnelID) + + return &TunnelIO{ + ID: tunnelID, + SessionID: sessionID, + Send: make(chan []byte), + Recv: make(chan []byte), + isOpen: false, + mutex: &sync.RWMutex{}, + } +} + +// Write - Writer method for interface +func (tun *TunnelIO) Write(data []byte) (int, error) { + if !tun.IsOpen() { + log.Printf("Warning: Write on closed tunnel %d", tun.ID) + return 0, io.EOF + } + + // This is necessary to avoid any race conditions on thay byte array + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + + log.Printf("Write %d bytes", len(dataCopy)) + + tun.Send <- dataCopy + n := len(dataCopy) + + return n, nil +} + +// Read - Reader method for interface +func (tun *TunnelIO) Read(data []byte) (int, error) { + if !tun.IsOpen() { + log.Printf("Warning: Read on closed tunnel %d", tun.ID) + return 0, io.EOF + } + + var buff bytes.Buffer + + recvData := <-tun.Recv + log.Printf("Read %d bytes", len(recvData)) + buff.Write(recvData) + + n := copy(data, buff.Bytes()) + return n, nil +} + +// Close - Close tunnel IO operations +func (tun *TunnelIO) Close() error { + tun.mutex.Lock() + defer tun.mutex.Unlock() + + if !tun.isOpen { + log.Printf("Warning: Close on closed tunnel %d", tun.ID) + + // I guess we can ignore it and don't return any error + return nil + } + + log.Printf("Close tunnel %d", tun.ID) + + tun.isOpen = false + + close(tun.Recv) + close(tun.Send) + + return nil +} + +func (tun *TunnelIO) IsOpen() bool { + tun.mutex.RLock() + defer tun.mutex.RUnlock() + + return tun.isOpen +} + +func (tun *TunnelIO) Open() error { + tun.mutex.Lock() + defer tun.mutex.Unlock() + + if tun.isOpen { + return errors.New("tunnel relady in open state") + } + + log.Printf("Open tunnel %d", tun.ID) + + tun.isOpen = true + + return nil +} diff --git a/client/core/tunnels.go b/client/core/tunnels.go new file mode 100644 index 0000000000..52dd7c48ed --- /dev/null +++ b/client/core/tunnels.go @@ -0,0 +1,124 @@ +package core + +import ( + "errors" + "log" + "sync" + + "github.com/bishopfox/sliver/protobuf/rpcpb" + "github.com/bishopfox/sliver/protobuf/sliverpb" +) + +var ( + // tunnelsStorage - Holds refs to all tunnels + tunnelsStorage *tunnels + + tunnelsSingletonLock = &sync.Mutex{} +) + +// GetTunnels - singleton function that returns or initializes all tunnels +func GetTunnels() *tunnels { + tunnelsSingletonLock.Lock() + defer tunnelsSingletonLock.Unlock() + + if tunnelsStorage == nil { + log.Println("Creating single instance of tunnels.") + + tunnelsStorage = &tunnels{ + tunnels: &map[uint64]*TunnelIO{}, + mutex: &sync.RWMutex{}, + streamMutex: &sync.Mutex{}, + } + } + + return tunnelsStorage +} + +// Holds the tunnels locally so we can map incoming data +// messages to the tunnel +type tunnels struct { + tunnels *map[uint64]*TunnelIO + mutex *sync.RWMutex + streamMutex *sync.Mutex + stream rpcpb.SliverRPC_TunnelDataClient +} + +func (t *tunnels) SetStream(stream rpcpb.SliverRPC_TunnelDataClient) { + t.streamMutex.Lock() + defer t.streamMutex.Unlock() + + log.Printf("Set stream") + + t.stream = stream +} + +// Get - Get a tunnel +func (t *tunnels) Get(tunnelID uint64) *TunnelIO { + t.mutex.RLock() + defer t.mutex.RUnlock() + + log.Printf("Get tunnel %d", tunnelID) + + return (*t.tunnels)[tunnelID] +} + +// send - safe way to send a message to the stream +// protobuf stream allow only one writer at a time, so just in case there is a mutex for it +func (t *tunnels) send(tunnelData *sliverpb.TunnelData) error { + t.streamMutex.Lock() + defer t.streamMutex.Unlock() + + if t.stream == nil { + return errors.New("uninitizlied stream") + } + + log.Printf("Private send to stream, tunnelId: %d", tunnelData.TunnelID) + + return t.stream.Send(tunnelData) +} + +// Start - Add a tunnel to the core mapper +func (t *tunnels) Start(tunnelID uint64, sessionID string) *TunnelIO { + t.mutex.Lock() + defer t.mutex.Unlock() + + tunnel := NewTunnelIO(tunnelID, sessionID) + + (*t.tunnels)[tunnelID] = tunnel + + go func(tunnel *TunnelIO) { + tunnel.Open() + log.Printf("Tunnel now is open, %d", tunnelID) + + for data := range tunnel.Send { + log.Printf("Send %d bytes on tunnel %d", len(data), tunnel.ID) + + t.send(&sliverpb.TunnelData{ + TunnelID: tunnel.ID, + SessionID: tunnel.SessionID, + Data: data, + }) + } + + log.Printf("Tunnel Send channel looks closed now. %d", tunnelID) + }(tunnel) + + tunnel.Send <- make([]byte, 0) // Send "zero" message to bind client to tunnel + return tunnel +} + +// Close - Close the tunnel channels +func (t *tunnels) Close(tunnelID uint64) { + t.mutex.Lock() + defer t.mutex.Unlock() + + log.Printf("Closing tunnel %d", tunnelID) + + tunnel := (*t.tunnels)[tunnelID] + + if tunnel != nil { + tunnel.Close() + + delete((*t.tunnels), tunnelID) + } +} diff --git a/server/core/tunnels.go b/server/core/tunnels.go index 85c3b5e9be..9bcf8a8dd2 100644 --- a/server/core/tunnels.go +++ b/server/core/tunnels.go @@ -111,6 +111,7 @@ func (t *tunnels) Close(tunnelID uint64) error { func (t *tunnels) Get(tunnelID uint64) *Tunnel { t.mutex.Lock() defer t.mutex.Unlock() + return t.tunnels[tunnelID] }