Skip to content

Commit

Permalink
Tunnels refactoring and datarace fixes
Browse files Browse the repository at this point in the history
Golang documentation mentioning it's unsafe to concurently send messages
via same tunnel, so there is a mutex and a data copy on write
  • Loading branch information
mrThe committed May 28, 2022
1 parent 5049079 commit 62be36e
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 114 deletions.
2 changes: 1 addition & 1 deletion client/command/exec/execute-shellcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func executeInteractive(ctx *grumble.Context, hostProc string, shellcode []byte,
return
}

tunnel := core.Tunnels.Start(rpcTunnel.GetTunnelID(), rpcTunnel.GetSessionID())
tunnel := core.GetTunnels().Start(rpcTunnel.GetTunnelID(), rpcTunnel.GetSessionID())

shell, err := con.Rpc.Shell(context.Background(), &sliverpb.ShellReq{
Request: con.ActiveTarget.Request(ctx),
Expand Down
2 changes: 1 addition & 1 deletion client/command/shell/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func runInteractive(ctx *grumble.Context, shellPath string, noPty bool, con *con
log.Printf("Created new tunnel with id: %d, binding to shell ...", rpcTunnel.TunnelID)

// Start() takes an RPC tunnel and creates a local Reader/Writer tunnel object
tunnel := core.Tunnels.Start(rpcTunnel.TunnelID, rpcTunnel.SessionID)
tunnel := core.GetTunnels().Start(rpcTunnel.TunnelID, rpcTunnel.SessionID)

shell, err := con.Rpc.Shell(context.Background(), &sliverpb.ShellReq{
Request: con.ActiveTarget.Request(ctx),
Expand Down
10 changes: 5 additions & 5 deletions client/core/portfwd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (p *ChannelProxy) HandleConn(conn net.Conn) {
// Cleanup
defer func() {
go conn.Close()
Tunnels.Close(tunnel.ID)
GetTunnels().Close(tunnel.ID)
}()

errs := make(chan error, 1)
Expand Down Expand Up @@ -172,7 +172,7 @@ func (p *ChannelProxy) Host() string {
return host
}

func (p *ChannelProxy) dialImplant(ctx context.Context) (*Tunnel, error) {
func (p *ChannelProxy) dialImplant(ctx context.Context) (*TunnelIO, error) {

log.Printf("[tcpproxy] Dialing implant to create tunnel ...")

Expand All @@ -185,7 +185,7 @@ func (p *ChannelProxy) dialImplant(ctx context.Context) (*Tunnel, error) {
return nil, err
}
log.Printf("[tcpproxy] Created new tunnel with id %d (session %s)", rpcTunnel.TunnelID, p.Session.ID)
tunnel := Tunnels.Start(rpcTunnel.TunnelID, rpcTunnel.SessionID)
tunnel := GetTunnels().Start(rpcTunnel.TunnelID, rpcTunnel.SessionID)

log.Printf("[tcpproxy] Binding tunnel to portfwd %d", p.Port())
portfwdResp, err := p.Rpc.Portfwd(ctx, &sliverpb.PortfwdReq{
Expand Down Expand Up @@ -219,7 +219,7 @@ func (p *ChannelProxy) dialTimeout() time.Duration {
return 30 * time.Second
}

func toImplantLoop(conn net.Conn, tunnel *Tunnel, errs chan<- error) {
func toImplantLoop(conn net.Conn, tunnel *TunnelIO, errs chan<- error) {
if wc, ok := conn.(*tcpproxy.Conn); ok && len(wc.Peeked) > 0 {
if _, err := tunnel.Write(wc.Peeked); err != nil {
errs <- err
Expand All @@ -232,7 +232,7 @@ func toImplantLoop(conn net.Conn, tunnel *Tunnel, errs chan<- error) {
errs <- err
}

func fromImplantLoop(conn net.Conn, tunnel *Tunnel, errs chan<- error) {
func fromImplantLoop(conn net.Conn, tunnel *TunnelIO, errs chan<- error) {
n, err := io.Copy(conn, tunnel)
log.Printf("[tcpproxy] Closing from-implant after %d byte(s)", n)
errs <- err
Expand Down
115 changes: 8 additions & 107 deletions client/core/tunnel.go
Original file line number Diff line number Diff line change
@@ -1,127 +1,28 @@
package core

import (
"bytes"
"context"
"io"
"log"
"sync"

"github.com/bishopfox/sliver/protobuf/rpcpb"
"github.com/bishopfox/sliver/protobuf/sliverpb"
)

var (
// Tunnels - Holds refs to all tunnels
Tunnels tunnels
)

// Holds the tunnels locally so we can map incoming data
// messages to the tunnel
type tunnels struct {
tunnels *map[uint64]*Tunnel
mutex *sync.RWMutex
stream rpcpb.SliverRPC_TunnelDataClient
}

// Get - Get a tunnel
func (t *tunnels) Get(tunnelID uint64) *Tunnel {
t.mutex.RLock()
defer t.mutex.RUnlock()
return (*t.tunnels)[tunnelID]
}

// Start - Add a tunnel to the core mapper
func (t *tunnels) Start(tunnelID uint64, sessionID string) *Tunnel {
t.mutex.Lock()
defer t.mutex.Unlock()
tunnel := &Tunnel{
ID: tunnelID,
SessionID: sessionID,
Send: make(chan []byte),
Recv: make(chan []byte),
}
(*t.tunnels)[tunnelID] = tunnel
go func() {
tunnel.IsOpen = true
for data := range tunnel.Send {
log.Printf("Send %d bytes on tunnel %d", len(data), tunnel.ID)
t.stream.Send(&sliverpb.TunnelData{
TunnelID: tunnel.ID,
SessionID: tunnel.SessionID,
Data: data,
})
}
}()
tunnel.Send <- make([]byte, 0) // Send "zero" message to bind client to tunnel
return tunnel
}

// Close - Close the tunnel channels
func (t *tunnels) Close(tunnelID uint64) {
log.Printf("Closing tunnel %d", tunnelID)
t.mutex.Lock()
defer t.mutex.Unlock()
tunnel := (*t.tunnels)[tunnelID]
if tunnel != nil {
delete((*t.tunnels), tunnelID)
tunnel.IsOpen = false
close(tunnel.Recv)
close(tunnel.Send)
}
}

// Tunnel - Duplex data tunnel
type Tunnel struct {
ID uint64
IsOpen bool
SessionID string

Send chan []byte
Recv chan []byte
}

// Write - Writer method for interface
func (tun *Tunnel) Write(data []byte) (int, error) {
log.Printf("Write %d bytes", len(data))
if !tun.IsOpen {
return 0, io.EOF
}
tun.Send <- data
n := len(data)
return n, nil
}

// Read - Reader method for interface
func (tun *Tunnel) Read(data []byte) (int, error) {
var buff bytes.Buffer
if !tun.IsOpen {
log.Printf("Warning: Read on closed tunnel %d", tun.ID)
return 0, io.EOF
}
recvData := <-tun.Recv
log.Printf("Read %d bytes", len(recvData))
buff.Write(recvData)
n := copy(data, buff.Bytes())
return n, nil
}

// TunnelLoop - Parses incoming tunnel messages and distributes them
// to session/tunnel objects
// Expected to be called only once during initialization
func TunnelLoop(rpc rpcpb.SliverRPCClient) error {
log.Println("Starting tunnel data loop ...")
defer log.Printf("Warning: TunnelLoop exited")

stream, err := rpc.TunnelData(context.Background())
if err != nil {
return err
}
Tunnels = tunnels{
tunnels: &map[uint64]*Tunnel{},
mutex: &sync.RWMutex{},
stream: stream,
}
for {

GetTunnels().SetStream(stream)

for {
// log.Printf("Waiting for TunnelData ...")
incoming, err := stream.Recv()
// log.Printf("Recv stream msg: %v", incoming)
Expand All @@ -134,15 +35,15 @@ func TunnelLoop(rpc rpcpb.SliverRPCClient) error {
return err
}
// log.Printf("Received TunnelData for tunnel %d", incoming.TunnelID)
tunnel := Tunnels.Get(incoming.TunnelID)
tunnel := GetTunnels().Get(incoming.TunnelID)

if tunnel != nil {
if !incoming.Closed {
log.Printf("Received data on tunnel %d", tunnel.ID)
tunnel.Recv <- incoming.GetData()
} else {
log.Printf("Closing tunnel %d", tunnel.ID)
tunnel.IsOpen = false
close(tunnel.Recv)
GetTunnels().Close(tunnel.ID)
}
} else {
log.Printf("Received tunnel data for non-existent tunnel id %d", incoming.TunnelID)
Expand Down
115 changes: 115 additions & 0 deletions client/core/tunnel_io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package core

import (
"bytes"
"errors"
"io"
"log"
"sync"
)

// TunnelIO - Duplex data tunnel, compatible with both io.ReadWriter
type TunnelIO struct {
ID uint64
SessionID string

Send chan []byte
Recv chan []byte

isOpen bool
mutex *sync.RWMutex
}

// NewTunnelIO - Single entry point for creating instance of new TunnelIO
func NewTunnelIO(tunnelID uint64, sessionID string) *TunnelIO {
log.Printf("New tunnel!: %d", tunnelID)

return &TunnelIO{
ID: tunnelID,
SessionID: sessionID,
Send: make(chan []byte),
Recv: make(chan []byte),
isOpen: false,
mutex: &sync.RWMutex{},
}
}

// Write - Writer method for interface
func (tun *TunnelIO) Write(data []byte) (int, error) {
if !tun.IsOpen() {
log.Printf("Warning: Write on closed tunnel %d", tun.ID)
return 0, io.EOF
}

// This is necessary to avoid any race conditions on thay byte array
dataCopy := make([]byte, len(data))
copy(dataCopy, data)

log.Printf("Write %d bytes", len(dataCopy))

tun.Send <- dataCopy
n := len(dataCopy)

return n, nil
}

// Read - Reader method for interface
func (tun *TunnelIO) Read(data []byte) (int, error) {
if !tun.IsOpen() {
log.Printf("Warning: Read on closed tunnel %d", tun.ID)
return 0, io.EOF
}

var buff bytes.Buffer

recvData := <-tun.Recv
log.Printf("Read %d bytes", len(recvData))
buff.Write(recvData)

n := copy(data, buff.Bytes())
return n, nil
}

// Close - Close tunnel IO operations
func (tun *TunnelIO) Close() error {
tun.mutex.Lock()
defer tun.mutex.Unlock()

if !tun.isOpen {
log.Printf("Warning: Close on closed tunnel %d", tun.ID)

// I guess we can ignore it and don't return any error
return nil
}

log.Printf("Close tunnel %d", tun.ID)

tun.isOpen = false

close(tun.Recv)
close(tun.Send)

return nil
}

func (tun *TunnelIO) IsOpen() bool {
tun.mutex.RLock()
defer tun.mutex.RUnlock()

return tun.isOpen
}

func (tun *TunnelIO) Open() error {
tun.mutex.Lock()
defer tun.mutex.Unlock()

if tun.isOpen {
return errors.New("tunnel relady in open state")
}

log.Printf("Open tunnel %d", tun.ID)

tun.isOpen = true

return nil
}
Loading

0 comments on commit 62be36e

Please sign in to comment.