diff --git a/go.mod b/go.mod index fe7bdea..a575909 100644 --- a/go.mod +++ b/go.mod @@ -12,10 +12,13 @@ require ( ) require ( + github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/rivo/uniseg v0.4.4 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect diff --git a/go.sum b/go.sum index 47a4200..a440aaf 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= @@ -7,6 +9,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI= github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= @@ -48,6 +52,8 @@ golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/protocol/interop.go b/protocol/interop.go index dac9a3e..2521655 100644 --- a/protocol/interop.go +++ b/protocol/interop.go @@ -17,6 +17,7 @@ const ( http2Protocol = "http2" redisProtocol = "redis" mongoProtocol = "mongo" + mqttProtocol = "mqtt" ) var interop defaultInterop @@ -37,6 +38,8 @@ func CreateInterop(protocol string) Interop { return new(redisInterop) case mongoProtocol: return new(mongoInterop) + case mqttProtocol: + return new(mqttInterop) default: return interop } diff --git a/protocol/mqtt.go b/protocol/mqtt.go new file mode 100644 index 0000000..b087d6d --- /dev/null +++ b/protocol/mqtt.go @@ -0,0 +1,28 @@ +package protocol + +import ( + "io" + + "github.com/eclipse/paho.mqtt.golang/packets" + "github.com/kevwan/tproxy/display" +) + +type mqttInterop struct { +} + +func (red *mqttInterop) Dump(r io.Reader, source string, id int, quiet bool) { + for { + readPacket, err := packets.ReadPacket(r) + if err != nil && err == io.EOF { + continue + } + if err != nil { + display.PrintfWithTime("[%s-%d] read packet has err: %+v, stop!!!\n", source, id, err) + return + } + if !quiet { + display.PrintfWithTime("[%s-%d] %s\n", source, id, readPacket.String()) + continue + } + } +} diff --git a/tproxy.go b/tproxy.go index 02ecd0c..9e8f896 100644 --- a/tproxy.go +++ b/tproxy.go @@ -16,7 +16,7 @@ func main() { localHost = flag.String("l", "localhost", "Local address to listen on") remote = flag.String("r", "", "Remote address (host:port) to connect") delay = flag.Duration("d", 0, "the delay to relay packets") - protocol = flag.String("t", "", "The type of protocol, currently support http2, grpc, redis and mongodb") + protocol = flag.String("t", "", "The type of protocol, currently support http2, grpc, redis, mongodb and mqtt") stat = flag.Bool("s", false, "Enable statistics") quiet = flag.Bool("q", false, "Quiet mode, only prints connection open/close and stats, default false") upLimit = flag.Int64("up", 0, "Upward speed limit(bytes/second)")