@@ -4,13 +4,14 @@ import { Socket } from 'net';
44import * as sinon from 'sinon' ;
55import { setTimeout } from 'timers' ;
66
7+ import { BinMsg } from '../../../src/cmap/commands' ;
78import { connect } from '../../../src/cmap/connect' ;
89import { Connection , hasSessionSupport } from '../../../src/cmap/connection' ;
910import { MessageStream } from '../../../src/cmap/message_stream' ;
1011import { MongoNetworkTimeoutError } from '../../../src/error' ;
1112import { isHello , ns } from '../../../src/utils' ;
1213import * as mock from '../../tools/mongodb-mock/index' ;
13- import { getSymbolFrom } from '../../tools/utils' ;
14+ import { generateOpMsgBuffer , getSymbolFrom } from '../../tools/utils' ;
1415import { createTimerSandbox } from '../timer_sandbox' ;
1516
1617const connectionOptionsDefaults = {
@@ -22,6 +23,25 @@ const connectionOptionsDefaults = {
2223 loadBalanced : false
2324} ;
2425
26+ /** The absolute minimum socket API needed by Connection as of writing this test */
27+ class FakeSocket extends EventEmitter {
28+ address ( ) {
29+ // is never called
30+ }
31+ pipe ( ) {
32+ // does not need to do anything
33+ }
34+ destroy ( ) {
35+ // is called, has no side effects
36+ }
37+ get remoteAddress ( ) {
38+ return 'iLoveJavaScript' ;
39+ }
40+ get remotePort ( ) {
41+ return 123 ;
42+ }
43+ }
44+
2545describe ( 'new Connection()' , function ( ) {
2646 let server ;
2747 after ( ( ) => mock . cleanup ( ) ) ;
@@ -137,6 +157,89 @@ describe('new Connection()', function () {
137157 } ) ;
138158 } ) ;
139159
160+ describe ( '#onMessage' , function ( ) {
161+ context ( 'when the connection is a monitoring connection' , function ( ) {
162+ let queue : Map < number , OperationDescription > ;
163+ let driverSocket : FakeSocket ;
164+ let connection : Connection ;
165+
166+ beforeEach ( function ( ) {
167+ driverSocket = sinon . spy ( new FakeSocket ( ) ) ;
168+ // @ts -expect-error: driverSocket does not fully satisfy the stream type, but that's okay
169+ connection = sinon . spy ( new Connection ( driverSocket , connectionOptionsDefaults ) ) ;
170+ connection . isMonitoringConnection = true ;
171+ const queueSymbol = getSymbolFrom ( connection , 'queue' ) ;
172+ queue = connection [ queueSymbol ] ;
173+ } ) ;
174+
175+ context ( 'when requestId/responseTo do not match' , function ( ) {
176+ let callbackSpy ;
177+ const document = { ok : 1 } ;
178+
179+ beforeEach ( function ( ) {
180+ callbackSpy = sinon . spy ( ) ;
181+ // Create the operation description.
182+ const operationDescription : OperationDescription = {
183+ requestId : 1 ,
184+ cb : callbackSpy
185+ } ;
186+
187+ // Stick an operation description in the queue.
188+ queue . set ( 1 , operationDescription ) ;
189+ // Emit a message that won't match the existing operation description.
190+ const msg = generateOpMsgBuffer ( document ) ;
191+ const msgHeader : MessageHeader = {
192+ length : msg . readInt32LE ( 0 ) ,
193+ requestId : msg . readInt32LE ( 4 ) ,
194+ responseTo : msg . readInt32LE ( 8 ) ,
195+ opCode : msg . readInt32LE ( 12 )
196+ } ;
197+ const msgBody = msg . subarray ( 16 ) ;
198+
199+ const message = new BinMsg ( msg , msgHeader , msgBody ) ;
200+ driverSocket . emit ( 'message' , message ) ;
201+ } ) ;
202+
203+ it ( 'calls the operation description callback with the document' , function ( ) {
204+ expect ( callbackSpy ) . to . be . calledWith ( document ) ;
205+ } ) ;
206+ } ) ;
207+
208+ context ( 'when requestId/reponseTo match' , function ( ) {
209+ let callbackSpy ;
210+ const document = { ok : 1 } ;
211+
212+ beforeEach ( function ( ) {
213+ callbackSpy = sinon . spy ( ) ;
214+ // Create the operation description.
215+ const operationDescription : OperationDescription = {
216+ requestId : 1 ,
217+ cb : callbackSpy
218+ } ;
219+
220+ // Stick an operation description in the queue.
221+ queue . set ( 1 , operationDescription ) ;
222+ // Emit a message that matches the existing operation description.
223+ const msg = generateOpMsgBuffer ( document ) ;
224+ const msgHeader : MessageHeader = {
225+ length : msg . readInt32LE ( 0 ) ,
226+ requestId : 2 ,
227+ responseTo : 1 ,
228+ opCode : msg . readInt32LE ( 12 )
229+ } ;
230+ const msgBody = msg . subarray ( 16 ) ;
231+
232+ const message = new BinMsg ( msg , msgHeader , msgBody ) ;
233+ driverSocket . emit ( 'message' , message ) ;
234+ } ) ;
235+
236+ it ( 'calls the operation description callback with the document' , function ( ) {
237+ expect ( callbackSpy ) . to . be . calledWith ( document ) ;
238+ } ) ;
239+ } ) ;
240+ } ) ;
241+ } ) ;
242+
140243 describe ( 'onTimeout()' , ( ) => {
141244 let connection : sinon . SinonSpiedInstance < Connection > ;
142245 let clock : sinon . SinonFakeTimers ;
@@ -146,25 +249,6 @@ describe('new Connection()', function () {
146249 let kDelayedTimeoutId : symbol ;
147250 let NodeJSTimeoutClass : any ;
148251
149- /** The absolute minimum socket API needed by Connection as of writing this test */
150- class FakeSocket extends EventEmitter {
151- address ( ) {
152- // is never called
153- }
154- pipe ( ) {
155- // does not need to do anything
156- }
157- destroy ( ) {
158- // is called, has no side effects
159- }
160- get remoteAddress ( ) {
161- return 'iLoveJavaScript' ;
162- }
163- get remotePort ( ) {
164- return 123 ;
165- }
166- }
167-
168252 beforeEach ( ( ) => {
169253 timerSandbox = createTimerSandbox ( ) ;
170254 clock = sinon . useFakeTimers ( ) ;
0 commit comments