Skip to content

Commit

Permalink
chore: refactor (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan authored Feb 15, 2023
1 parent c4e6845 commit 605a78f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 51 deletions.
91 changes: 42 additions & 49 deletions protocol/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,34 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"

"github.com/fatih/color"
"github.com/kevwan/tproxy/display"
"github.com/mongodb/mongo-go-driver/bson"
"io"
)

const (
OP_REPLY = 1 //Reply to a client request. responseTo is set.
OP_UPDATE = 2001 //Update document.
OP_INSERT = 2002 //Insert new document.
RESERVED = 2003 //Formerly used for OP_GET_BY_OID.
OP_QUERY = 2004 //Query a collection.
OP_GET_MORE = 2005 //Get more data from a query. See Cursors.
OP_DELETE = 2006 //Delete documents.
OP_KILL_CURSORS = 2007 //Notify database that the client has finished with the cursor.
OP_COMMAND = 2010 //Cluster internal protocol representing a command request.
OP_COMMANDREPLY = 2011 //Cluster internal protocol representing a reply to an OP_COMMAND.
OP_MSG = 2013 //Send a message using the format introduced in MongoDB 3.6.
OpReply = 1 // Reply to a client request. responseTo is set.
OpUpdate = 2001 // Update document.
OpInsert = 2002 // Insert new document.
Reserved = 2003 // Formerly used for OP_GET_BY_OID.
OpQuery = 2004 // Query a collection.
OpGetMore = 2005 // Get more data from a query. See Cursors.
OpDelete = 2006 // Delete documents.
OpKillCursors = 2007 // Notify database that the client has finished with the cursor.
OpCommand = 2010 // Cluster internal protocol representing a command request.
OpCommandreply = 2011 // Cluster internal protocol representing a reply to an OP_COMMAND.
OpMsg = 2013 // Send a message using the format introduced in MongoDB 3.6.
)

type mongoInterop struct {
}

type packet struct {
IsClientFlow bool //client->server
IsClientFlow bool // client->server
MessageLength int
OpCode int //request type
OpCode int // request type
Payload io.Reader
}

Expand All @@ -51,40 +52,34 @@ func (mongo *mongoInterop) Dump(r io.Reader, source string, id int, quiet bool)
func resolveClientPacket(pk *packet) {
var msg string
switch pk.OpCode {
case OP_UPDATE:
zero := readInt32(pk.Payload)
case OpUpdate:
_ = readInt32(pk.Payload)
fullCollectionName := readString(pk.Payload)
flags := readInt32(pk.Payload)
_ = readInt32(pk.Payload)
selector := readBson2Json(pk.Payload)
update := readBson2Json(pk.Payload)
_ = zero
_ = flags

msg = fmt.Sprintf(" [Update] [coll:%s] %v %v",
fullCollectionName,
selector,
update,
)

case OP_INSERT:
flags := readInt32(pk.Payload)
case OpInsert:
_ = readInt32(pk.Payload)
fullCollectionName := readString(pk.Payload)
command := readBson2Json(pk.Payload)
_ = flags

msg = fmt.Sprintf(" [Insert] [coll:%s] %v",
fullCollectionName,
command,
)

case OP_QUERY:
flags := readInt32(pk.Payload)
case OpQuery:
_ = readInt32(pk.Payload)
fullCollectionName := readString(pk.Payload)
numberToSkip := readInt32(pk.Payload)
numberToReturn := readInt32(pk.Payload)
_ = flags
_ = numberToSkip
_ = numberToReturn
_ = readInt32(pk.Payload)
_ = readInt32(pk.Payload)

command := readBson2Json(pk.Payload)
selector := readBson2Json(pk.Payload)
Expand All @@ -95,7 +90,7 @@ func resolveClientPacket(pk *packet) {
selector,
)

case OP_COMMAND:
case OpCommand:
database := readString(pk.Payload)
commandName := readString(pk.Payload)
metaData := readBson2Json(pk.Payload)
Expand All @@ -110,33 +105,30 @@ func resolveClientPacket(pk *packet) {
inputDocs,
)

case OP_GET_MORE:
zero := readInt32(pk.Payload)
case OpGetMore:
_ = readInt32(pk.Payload)
fullCollectionName := readString(pk.Payload)
numberToReturn := readInt32(pk.Payload)
cursorId := readInt64(pk.Payload)
_ = zero

msg = fmt.Sprintf(" [Query more] [coll:%s] [num of reply:%v] [cursor:%v]",
fullCollectionName,
numberToReturn,
cursorId,
)

case OP_DELETE:
zero := readInt32(pk.Payload)
case OpDelete:
_ = readInt32(pk.Payload)
fullCollectionName := readString(pk.Payload)
flags := readInt32(pk.Payload)
_ = readInt32(pk.Payload)
selector := readBson2Json(pk.Payload)
_ = zero
_ = flags

msg = fmt.Sprintf(" [Delete] [coll:%s] %v",
fullCollectionName,
selector,
)

case OP_MSG:
case OpMsg:
return
default:
return
Expand All @@ -146,12 +138,12 @@ func resolveClientPacket(pk *packet) {
}

func newPacket(source string, r io.Reader) *packet {
//read pk
// read pk
var pk *packet
var err error
pk, err = parsePacket(r)

//stream close
// stream close
if err == io.EOF {
display.PrintlnWithTime(" close")
return nil
Expand All @@ -174,7 +166,7 @@ func parsePacket(r io.Reader) (*packet, error) {
var buf bytes.Buffer
p := &packet{}

//header
// header
header := make([]byte, 16)
if _, err := io.ReadFull(r, header); err != nil {
return nil, err
Expand Down Expand Up @@ -239,30 +231,31 @@ func readString(r io.Reader) string {
}

func readBson2Json(r io.Reader) string {
//read len
// read len
docLen := readInt32(r)
if docLen == 0 {
return ""
}

//document []byte
// document []byte
docBytes := make([]byte, int(docLen))
binary.LittleEndian.PutUint32(docBytes, uint32(docLen))
if _, err := io.ReadFull(r, docBytes[4:]); err != nil {
panic(err)
}

//resolve document
// resolve document
var bsn bson.M
err := bson.Unmarshal(docBytes, &bsn)
if err != nil {
panic(err)
}

//format to Json
jsonStr, err := json.Marshal(bsn)
// format to Json
b, err := json.Marshal(bsn)
if err != nil {
return fmt.Sprintf("{\"error\":%s}", err.Error())
return fmt.Sprintf("{\"error\":%q}", err.Error())
}
return string(jsonStr)

return string(b)
}
3 changes: 2 additions & 1 deletion protocol/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package protocol

import (
"bufio"
"github.com/kevwan/tproxy/display"
"io"
"strconv"
"strings"

"github.com/kevwan/tproxy/display"
)

type redisInterop struct {
Expand Down
2 changes: 1 addition & 1 deletion tproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func main() {
localHost = flag.String("l", "localhost", "Local address to listen on")
remote = flag.String("r", "", "Remote address (host:port) to connect")
delay = flag.Duration("d", 0, "the delay to relay packets")
protocol = flag.String("t", "", "The type of protocol, currently support grpc")
protocol = flag.String("t", "", "The type of protocol, currently support http2, grpc, redis and mongodb")
stat = flag.Bool("s", false, "Enable statistics")
quiet = flag.Bool("q", false,
"Quiet mode, only prints connection open/close and stats, default false")
Expand Down

0 comments on commit 605a78f

Please sign in to comment.