From b11b86a18ee1286bb87a3f919cbb425d18d3df9e Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 4 Sep 2024 06:14:04 -0400 Subject: [PATCH] [Packetbeat] Fix Packetbeat parsing mongodb OP_MSG (#40589) * [Packetbeat] Fix Packetbeat parsing mongodb OP_MSG * Fixes handling OP_MSG based request/response, missing "end" timestamp and "duration" field for the event --- packetbeat/protos/mongodb/mongodb.go | 56 ++++++++---- packetbeat/protos/mongodb/mongodb_parser.go | 48 ++++------ .../protos/mongodb/mongodb_parser_test.go | 69 +++++++------- packetbeat/protos/mongodb/mongodb_structs.go | 45 ++++++--- .../protos/mongodb/mongodb_structs_test.go | 86 ++++++++++++++++++ packetbeat/protos/mongodb/testdata/1req.bin | Bin 0 -> 111 bytes packetbeat/protos/mongodb/testdata/1res.bin | Bin 0 -> 296 bytes packetbeat/protos/mongodb/testdata/2req.bin | Bin 0 -> 132 bytes packetbeat/protos/mongodb/testdata/2res.bin | Bin 0 -> 296 bytes packetbeat/protos/mongodb/testdata/3req.bin | Bin 0 -> 132 bytes packetbeat/protos/mongodb/testdata/3res.bin | Bin 0 -> 179 bytes 11 files changed, 212 insertions(+), 92 deletions(-) create mode 100644 packetbeat/protos/mongodb/mongodb_structs_test.go create mode 100644 packetbeat/protos/mongodb/testdata/1req.bin create mode 100644 packetbeat/protos/mongodb/testdata/1res.bin create mode 100644 packetbeat/protos/mongodb/testdata/2req.bin create mode 100644 packetbeat/protos/mongodb/testdata/2res.bin create mode 100644 packetbeat/protos/mongodb/testdata/3req.bin create mode 100644 packetbeat/protos/mongodb/testdata/3res.bin diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index e65824e5de2..749342cf3d7 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -32,6 +32,8 @@ import ( "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" "github.com/elastic/beats/v7/packetbeat/protos/tcp" + + "go.mongodb.org/mongo-driver/bson/primitive" ) var debugf = logp.MakeDebug("mongodb") @@ -54,7 +56,7 @@ type mongodbPlugin struct { type transactionKey struct { tcp common.HashableTCPTuple - id int + id int32 } var unmatchedRequests = monitoring.NewInt(nil, "mongodb.unmatched_requests") @@ -232,7 +234,7 @@ func (mongodb *mongodbPlugin) handleMongodb( func (mongodb *mongodbPlugin) onRequest(conn *mongodbConnectionData, msg *mongodbMessage) { // publish request only transaction - if !awaitsReply(msg.opCode) { + if !awaitsReply(msg) { mongodb.onTransComplete(msg, nil) return } @@ -273,7 +275,6 @@ func (mongodb *mongodbPlugin) onResponse(conn *mongodbConnectionData, msg *mongo func (mongodb *mongodbPlugin) onTransComplete(requ, resp *mongodbMessage) { trans := newTransaction(requ, resp) debugf("Mongodb transaction completed: %s", trans.mongodb) - mongodb.publishTransaction(trans) } @@ -294,8 +295,9 @@ func newTransaction(requ, resp *mongodbMessage) *transaction { } trans.params = requ.params trans.resource = requ.resource - trans.bytesIn = requ.messageLength + trans.bytesIn = int(requ.messageLength) trans.documents = requ.documents + trans.requestDocuments = requ.documents // preserving request documents that contains mongodb query for the new OP_MSG based protocol } // fill response @@ -308,7 +310,7 @@ func newTransaction(requ, resp *mongodbMessage) *transaction { trans.documents = resp.documents trans.endTime = resp.ts - trans.bytesOut = resp.messageLength + trans.bytesOut = int(resp.messageLength) } @@ -325,10 +327,17 @@ func (mongodb *mongodbPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, return private } -func copyMapWithoutKey(d map[string]interface{}, key string) map[string]interface{} { +func copyMapWithoutKey(d map[string]interface{}, keys ...string) map[string]interface{} { res := map[string]interface{}{} for k, v := range d { - if k != key { + found := false + for _, excludeKey := range keys { + if k == excludeKey { + found = true + break + } + } + if !found { res[k] = v } } @@ -337,29 +346,40 @@ func copyMapWithoutKey(d map[string]interface{}, key string) map[string]interfac func reconstructQuery(t *transaction, full bool) (query string) { query = t.resource + "." + t.method + "(" + var doc interface{} + if len(t.params) > 0 { - var err error - var params string if !full { // remove the actual data. // TODO: review if we need to add other commands here switch t.method { case "insert": - params, err = doc2str(copyMapWithoutKey(t.params, "documents")) + doc = copyMapWithoutKey(t.params, "documents") case "update": - params, err = doc2str(copyMapWithoutKey(t.params, "updates")) + doc = copyMapWithoutKey(t.params, "updates") case "findandmodify": - params, err = doc2str(copyMapWithoutKey(t.params, "update")) + doc = copyMapWithoutKey(t.params, "update") } } else { - params, err = doc2str(t.params) + doc = t.params } - if err != nil { - debugf("Error marshaling params: %v", err) - } else { - query += params + } else if len(t.requestDocuments) > 0 { // This recovers the query document from OP_MSG + if m, ok := t.requestDocuments[0].(primitive.M); ok { + excludeKeys := []string{"lsid"} + if !full { + excludeKeys = append(excludeKeys, "documents") + } + doc = copyMapWithoutKey(m, excludeKeys...) } } + + queryString, err := doc2str(doc) + if err != nil { + debugf("Error marshaling query document: %v", err) + } else { + query += queryString + } + query += ")" skip, _ := t.event["numberToSkip"].(int) if skip > 0 { @@ -370,7 +390,7 @@ func reconstructQuery(t *transaction, full bool) (query string) { if limit > 0 && limit < 0x7fffffff { query += fmt.Sprintf(".limit(%d)", limit) } - return + return query } func (mongodb *mongodbPlugin) publishTransaction(t *transaction) { diff --git a/packetbeat/protos/mongodb/mongodb_parser.go b/packetbeat/protos/mongodb/mongodb_parser.go index 8a7be126902..acb743ee517 100644 --- a/packetbeat/protos/mongodb/mongodb_parser.go +++ b/packetbeat/protos/mongodb/mongodb_parser.go @@ -43,13 +43,13 @@ func mongodbMessageParser(s *stream) (bool, bool) { return true, false } - if length > len(s.data) { + if int(length) > len(s.data) { // Not yet reached the end of message return true, false } // Tell decoder to only consider current message - d.truncate(length) + d.truncate(int(length)) // fill up the header common to all messages // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#standard-message-header @@ -72,8 +72,7 @@ func mongodbMessageParser(s *stream) (bool, bool) { } s.message.opCode = opCode - s.message.isResponse = false // default is that the message is a request. If not opReplyParse will set this to false - s.message.expectsResponse = false + s.message.isResponse = false // default is that the message is a request. If not opReplyParse will set this to true debugf("opCode = %d (%v)", s.message.opCode, s.message.opCode) // then split depending on operation type @@ -93,11 +92,9 @@ func mongodbMessageParser(s *stream) (bool, bool) { s.message.method = "insert" return opInsertParse(d, s.message) case opQuery: - s.message.expectsResponse = true return opQueryParse(d, s.message) case opGetMore: s.message.method = "getMore" - s.message.expectsResponse = true return opGetMoreParse(d, s.message) case opDelete: s.message.method = "delete" @@ -107,6 +104,11 @@ func mongodbMessageParser(s *stream) (bool, bool) { return opKillCursorsParse(d, s.message) case opMsg: s.message.method = "msg" + // The assumption is that the message with responseTo == 0 is the request + // TODO: handle the cases where moreToCome flag is set (multiple responses chained by responseTo) + if s.message.responseTo > 0 { + s.message.isResponse = true + } return opMsgParse(d, s.message) } @@ -141,7 +143,7 @@ func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) { debugf("Prepare to read %d document from reply", m.event["numberReturned"]) documents := make([]interface{}, numberReturned) - for i := 0; i < numberReturned; i++ { + for i := int32(0); i < numberReturned; i++ { var document bson.M document, err = d.readDocument() if err != nil { @@ -235,19 +237,6 @@ func opInsertParse(d *decoder, m *mongodbMessage) (bool, bool) { return true, true } -func extractDocuments(query map[string]interface{}) []interface{} { - docsVi, present := query["documents"] - if !present { - return []interface{}{} - } - - docs, ok := docsVi.([]interface{}) - if !ok { - return []interface{}{} - } - return docs -} - // Try to guess whether this key:value pair found in // the query represents a command. func isDatabaseCommand(key string, val interface{}) bool { @@ -387,12 +376,14 @@ func opKillCursorsParse(d *decoder, m *mongodbMessage) (bool, bool) { func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) { // ignore flagbits - _, err := d.readInt32() + flagBits, err := d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } + m.SetFlagBits(flagBits) + // read sections kind, err := d.readByte() if err != nil { @@ -423,7 +414,7 @@ func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) { } m.event["message"] = cstring var documents []interface{} - for d.i < start+size { + for d.i < start+int(size) { document, err := d.readDocument() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) @@ -432,7 +423,8 @@ func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) { documents = append(documents, document) } m.documents = documents - + case msgKindInternal: + // Ignore the internal purposes section default: logp.Err("Unknown message kind: %v", kind) return false, false @@ -482,25 +474,25 @@ func (d *decoder) readByte() (byte, error) { return d.in[i], nil } -func (d *decoder) readInt32() (int, error) { +func (d *decoder) readInt32() (int32, error) { b, err := d.readBytes(4) if err != nil { return 0, err } - return int((uint32(b[0]) << 0) | + return int32((uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)), nil } -func (d *decoder) readInt64() (int, error) { +func (d *decoder) readInt64() (int64, error) { b, err := d.readBytes(8) if err != nil { return 0, err } - return int((uint64(b[0]) << 0) | + return int64((uint64(b[0]) << 0) | (uint64(b[1]) << 8) | (uint64(b[2]) << 16) | (uint64(b[3]) << 24) | @@ -516,7 +508,7 @@ func (d *decoder) readDocument() (bson.M, error) { if err != nil { return nil, err } - d.i = start + documentLength + d.i = start + int(documentLength) if len(d.in) < d.i { return nil, errors.New("document out of bounds") } diff --git a/packetbeat/protos/mongodb/mongodb_parser_test.go b/packetbeat/protos/mongodb/mongodb_parser_test.go index af4a647dd11..6071d25ca31 100644 --- a/packetbeat/protos/mongodb/mongodb_parser_test.go +++ b/packetbeat/protos/mongodb/mongodb_parser_test.go @@ -20,6 +20,9 @@ package mongodb import ( + "encoding/json" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -77,6 +80,39 @@ func TestMongodbParser_simpleRequest(t *testing.T) { } } +func TestMongodbParser_OpMsg(t *testing.T) { + files := []string{ + "1req.bin", + "1res.bin", + "2req.bin", + "2req.bin", + "3req.bin", + "3res.bin", + } + + for _, fn := range files { + data, err := os.ReadFile(filepath.Join("testdata", fn)) + if err != nil { + t.Fatal(err) + } + + st := &stream{data: data, message: new(mongodbMessage)} + + ok, complete := mongodbMessageParser(st) + + if !ok { + t.Errorf("Parsing returned error") + } + if !complete { + t.Errorf("Expecting a complete message") + } + _, err = json.Marshal(st.message.documents) + if err != nil { + t.Fatal(err) + } + } +} + func TestMongodbParser_unknownOpCode(t *testing.T) { var data []byte data = addInt32(data, 16) // length = 16 @@ -107,39 +143,6 @@ func addInt32(in []byte, v int32) []byte { return append(in, byte(u), byte(u>>8), byte(u>>16), byte(u>>24)) } -func Test_extract_documents(t *testing.T) { - type io struct { - Input map[string]interface{} - Output []interface{} - } - tests := []io{ - { - Input: map[string]interface{}{ - "a": 1, - "documents": []interface{}{"a", "b", "c"}, - }, - Output: []interface{}{"a", "b", "c"}, - }, - { - Input: map[string]interface{}{ - "a": 1, - }, - Output: []interface{}{}, - }, - { - Input: map[string]interface{}{ - "a": 1, - "documents": 1, - }, - Output: []interface{}{}, - }, - } - - for _, test := range tests { - assert.Equal(t, test.Output, extractDocuments(test.Input)) - } -} - func Test_isDatabaseCommand(t *testing.T) { type io struct { Key string diff --git a/packetbeat/protos/mongodb/mongodb_structs.go b/packetbeat/protos/mongodb/mongodb_structs.go index 4870e1516ed..67a9e26de8d 100644 --- a/packetbeat/protos/mongodb/mongodb_structs.go +++ b/packetbeat/protos/mongodb/mongodb_structs.go @@ -33,16 +33,20 @@ type mongodbMessage struct { cmdlineTuple *common.ProcessTuple direction uint8 - isResponse bool - expectsResponse bool + isResponse bool // Standard message header fields from mongodb wire protocol // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#standard-message-header - messageLength int - requestID int - responseTo int + messageLength int32 + requestID int32 + responseTo int32 opCode opCode + // decoded flagBits + checkSumPresent bool + moreToCome bool + exhaustAllowed bool + // deduced from content. Either an operation from the original wire protocol or the name of a command (passed through a query) // List of commands: http://docs.mongodb.org/manual/reference/command/ // List of original protocol operations: http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#request-opcodes @@ -57,6 +61,12 @@ type mongodbMessage struct { event mapstr.M } +func (m *mongodbMessage) SetFlagBits(flagBits int32) { + m.checkSumPresent = flagBits&0x1 != 0 // 0 bit + m.moreToCome = flagBits&0x2 != 0 // 1 bit + m.exhaustAllowed = flagBits&0x10000 != 0 // 16 bit +} + // Represent a stream being parsed that contains a mongodb message type stream struct { tcptuple *common.TCPTuple @@ -90,12 +100,13 @@ type transaction struct { mongodb mapstr.M - event mapstr.M - method string - resource string - error string - params map[string]interface{} - documents []interface{} + event mapstr.M + method string + resource string + error string + params map[string]interface{} + requestDocuments []interface{} + documents []interface{} } type msgKind byte @@ -103,6 +114,7 @@ type msgKind byte const ( msgKindBody msgKind = 0 msgKindDocumentSequence msgKind = 1 + msgKindInternal msgKind = 2 ) type opCode int32 @@ -147,8 +159,15 @@ func (o opCode) String() string { return fmt.Sprintf("(value=%d)", int32(o)) } -func awaitsReply(c opCode) bool { - return c == opQuery || c == opGetMore +func awaitsReply(msg *mongodbMessage) bool { + opCode := msg.opCode + // The request of opMsg type doesn't get response if moreToCome is set + // From documentation: https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol + // "Requests with the moreToCome bit set will not receive a reply" + if !msg.isResponse && opCode == opMsg && !msg.moreToCome { + return true + } + return opCode == opQuery || opCode == opGetMore } // List of mongodb user commands (send through a query of the legacy protocol) diff --git a/packetbeat/protos/mongodb/mongodb_structs_test.go b/packetbeat/protos/mongodb/mongodb_structs_test.go new file mode 100644 index 00000000000..8e5ec0fddca --- /dev/null +++ b/packetbeat/protos/mongodb/mongodb_structs_test.go @@ -0,0 +1,86 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mongodb + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestSetFlagBits(t *testing.T) { + tests := []struct { + name string + flagBits int32 + wantMsg mongodbMessage + }{ + { + name: "none", + flagBits: 0b0000, + wantMsg: mongodbMessage{}, + }, + { + name: "checksumpresent", + flagBits: 0b0001, + wantMsg: mongodbMessage{checkSumPresent: true}, + }, + { + name: "moreToCome", + flagBits: 0b00010, + wantMsg: mongodbMessage{moreToCome: true}, + }, + { + name: "checksumpresent_moreToCome", + flagBits: 0b00011, + wantMsg: mongodbMessage{checkSumPresent: true, moreToCome: true}, + }, + { + name: "exhaustallowed", + flagBits: 0x10000, + wantMsg: mongodbMessage{exhaustAllowed: true}, + }, + { + name: "checksumpresent_exhaustallowed", + flagBits: 0x10001, + wantMsg: mongodbMessage{checkSumPresent: true, exhaustAllowed: true}, + }, + { + name: "checksumpresent_moreToCome_exhaustallowed", + flagBits: 0x10003, + wantMsg: mongodbMessage{checkSumPresent: true, moreToCome: true, exhaustAllowed: true}, + }, + } + + flagBitsComparer := cmp.Comparer(func(m1, m2 mongodbMessage) bool { + return m1.checkSumPresent == m2.checkSumPresent && + m1.moreToCome == m2.moreToCome && + m1.exhaustAllowed == m2.exhaustAllowed + }) + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var gotMsg mongodbMessage + gotMsg.SetFlagBits(tc.flagBits) + + diff := cmp.Diff(tc.wantMsg, gotMsg, flagBitsComparer) + if diff != "" { + t.Fatal(diff) + } + }) + } +} diff --git a/packetbeat/protos/mongodb/testdata/1req.bin b/packetbeat/protos/mongodb/testdata/1req.bin new file mode 100644 index 0000000000000000000000000000000000000000..d29c6d1cc260380e61bd97e364f20d18213f538c GIT binary patch literal 111 zcmd02U|?_nVi35?4xt&MfHYHDW?l*d7m$^iSC*Prl3!HGz?_zuQ<7T5zzR~woKu{c v!XO7^u>xrUAkA{^PyPS-4KCu-&d0A_nC7R?z@(Cr!~in2B(=B%$YcNjmm(Pe literal 0 HcmV?d00001 diff --git a/packetbeat/protos/mongodb/testdata/1res.bin b/packetbeat/protos/mongodb/testdata/1res.bin new file mode 100644 index 0000000000000000000000000000000000000000..495fe160e8a80f3d874283e93d8a1405a36f84e9 GIT binary patch literal 296 zcmYLD%W48a5G;2i-o!*D1iX34DVSqUF;Va{!nh9|lAR$ltzteU{*0i%;XjBsbCA@! zD;7*w_jFO!j}>45pwG9fvE*p-GmK`=TZcc+H>q}B-jzgCY&nNLI8yCH$4EE7$A`md zxLUll0xd~~+$B`^ciI;f%q|H1r PV%?IL*t3|s{VVbBZ%l803J0tUy`-NV8n~Q~!T{gNyjI^YN<}rupeJFsY;@F@UTp KNi8k`G8q7cL>(po literal 0 HcmV?d00001 diff --git a/packetbeat/protos/mongodb/testdata/2res.bin b/packetbeat/protos/mongodb/testdata/2res.bin new file mode 100644 index 0000000000000000000000000000000000000000..d5ccd0bf6c8a50d909b8cc0f5ab4ee0a63027f9d GIT binary patch literal 296 zcmYLD%W48a5G;2UZ{jORz*{aki5@*AijdC;<38dLcLrx#ML$J7uHiOTYxci0@cs$eBG%B#>urK3F6uLMNu@Ol@ilr4mNrAk zKFV0a0p3X0(0%P1`fv|Tt(_`Ygmx$`LL2NDS|NEo|0JQwyKe}LM=p`XOFZpqvr3Fu Nx8WuBU(DUl3oeh$K2!hz literal 0 HcmV?d00001 diff --git a/packetbeat/protos/mongodb/testdata/3req.bin b/packetbeat/protos/mongodb/testdata/3req.bin new file mode 100644 index 0000000000000000000000000000000000000000..22461fa0a61d5aaba93e4dfb59a1e97cb39f95c2 GIT binary patch literal 132 zcmZo+U|?_pVi35?4xt(HfizQEW?l*d7m$^iSC*Prl3!HGz?_zuQ<7T5zzR|)P@J7v zz`zJ%bBZ%l803J0tUy`-NV8n~Q~!T{gNyjI^YN<}rupeJFsY;@F@UTp KNi8k`G8q7d2puQ@ literal 0 HcmV?d00001 diff --git a/packetbeat/protos/mongodb/testdata/3res.bin b/packetbeat/protos/mongodb/testdata/3res.bin new file mode 100644 index 0000000000000000000000000000000000000000..6950e63146fd73e0688e5eceb56eacca6aae4b5d GIT binary patch literal 179 zcmdnYz`)=N#7;nbmmNrez&s#kPA)Af&M#tU1+rPvGK-2!oDxftGZ_3qas~|UK#Dy+ zGle1T@N7QGPybwIms^T5WtODoGJs4;%g;|rEMgESEU9D=2boZkm|n~v2xKuCFt7rt v;>?_EAjKdAGy(>g@*qk|Qj1GclJqk3%2M-6@{1}N7#Q=xih&A0*fRhC25Tix literal 0 HcmV?d00001