11import { expect } from 'chai' ;
2- import { EventEmitter , on } from 'events' ;
2+ import { EventEmitter , once } from 'events' ;
33import { Socket } from 'net' ;
44import * as sinon from 'sinon' ;
55import { Readable } from 'stream' ;
@@ -9,7 +9,7 @@ import { BinMsg } from '../../../src/cmap/commands';
99import { connect } from '../../../src/cmap/connect' ;
1010import { Connection , hasSessionSupport } from '../../../src/cmap/connection' ;
1111import { MessageStream } from '../../../src/cmap/message_stream' ;
12- import { MongoNetworkTimeoutError } from '../../../src/error' ;
12+ import { MongoNetworkTimeoutError , MongoRuntimeError } from '../../../src/error' ;
1313import { isHello , ns } from '../../../src/utils' ;
1414import * as mock from '../../tools/mongodb-mock/index' ;
1515import { generateOpMsgBuffer , getSymbolFrom } from '../../tools/utils' ;
@@ -172,12 +172,13 @@ describe('new Connection()', function () {
172172 let callbackSpy ;
173173 const inputStream = new Readable ( ) ;
174174 const document = { ok : 1 } ;
175+ const last = { isWritablePrimary : true } ;
175176
176177 beforeEach ( function ( ) {
177178 callbackSpy = sinon . spy ( ) ;
178179 const firstHello = generateOpMsgBuffer ( document ) ;
179180 const secondHello = generateOpMsgBuffer ( document ) ;
180- const thirdHello = generateOpMsgBuffer ( document ) ;
181+ const thirdHello = generateOpMsgBuffer ( last ) ;
181182 const buffer = Buffer . concat ( [ firstHello , secondHello , thirdHello ] ) ;
182183
183184 connection = sinon . spy ( new Connection ( inputStream , connectionOptionsDefaults ) ) ;
@@ -199,9 +200,10 @@ describe('new Connection()', function () {
199200 inputStream . push ( null ) ;
200201 } ) ;
201202
202- it ( 'calls the operation description callback with the document' , async function ( ) {
203- await on ( inputStream , 'message' ) ;
204- expect ( callbackSpy ) . to . be . calledOnceWith ( undefined , document ) ;
203+ it ( 'calls the callback with the last hello document' , async function ( ) {
204+ const messages = await once ( connection , 'message' ) ;
205+ expect ( messages [ 0 ] . responseTo ) . to . equal ( 0 ) ;
206+ expect ( callbackSpy ) . to . be . calledOnceWith ( undefined , last ) ;
205207 } ) ;
206208 } ) ;
207209
@@ -230,8 +232,8 @@ describe('new Connection()', function () {
230232 const msg = generateOpMsgBuffer ( document ) ;
231233 const msgHeader : MessageHeader = {
232234 length : msg . readInt32LE ( 0 ) ,
233- requestId : msg . readInt32LE ( 4 ) ,
234- responseTo : msg . readInt32LE ( 8 ) ,
235+ requestId : 1 ,
236+ responseTo : 0 , // This will not match.
235237 opCode : msg . readInt32LE ( 12 )
236238 } ;
237239 const msgBody = msg . subarray ( 16 ) ;
@@ -284,6 +286,58 @@ describe('new Connection()', function () {
284286 expect ( callbackSpy ) . to . be . calledOnceWith ( undefined , document ) ;
285287 } ) ;
286288 } ) ;
289+
290+ context ( 'when more than one operation description is in the queue' , function ( ) {
291+ let spyOne ;
292+ let spyTwo ;
293+ const document = { ok : 1 } ;
294+
295+ beforeEach ( function ( ) {
296+ spyOne = sinon . spy ( ) ;
297+ spyTwo = sinon . spy ( ) ;
298+
299+ // @ts -expect-error: driverSocket does not fully satisfy the stream type, but that's okay
300+ connection = sinon . spy ( new Connection ( driverSocket , connectionOptionsDefaults ) ) ;
301+ connection . isMonitoringConnection = true ;
302+ const queueSymbol = getSymbolFrom ( connection , 'queue' ) ;
303+ queue = connection [ queueSymbol ] ;
304+
305+ // Create the operation descriptions.
306+ const descriptionOne : OperationDescription = {
307+ requestId : 1 ,
308+ cb : spyOne
309+ } ;
310+ const descriptionTwo : OperationDescription = {
311+ requestId : 2 ,
312+ cb : spyTwo
313+ } ;
314+
315+ // Stick an operation description in the queue.
316+ queue . set ( 2 , descriptionOne ) ;
317+ queue . set ( 3 , descriptionTwo ) ;
318+ // Emit a message that matches the existing operation description.
319+ const msg = generateOpMsgBuffer ( document ) ;
320+ const msgHeader : MessageHeader = {
321+ length : msg . readInt32LE ( 0 ) ,
322+ requestId : 2 ,
323+ responseTo : 1 ,
324+ opCode : msg . readInt32LE ( 12 )
325+ } ;
326+ const msgBody = msg . subarray ( 16 ) ;
327+
328+ const message = new BinMsg ( msg , msgHeader , msgBody ) ;
329+ connection . onMessage ( message ) ;
330+ } ) ;
331+
332+ it ( 'calls all operation description callbacks with an error' , function ( ) {
333+ expect ( spyOne ) . to . be . calledOnce ;
334+ expect ( spyTwo ) . to . be . calledOnce ;
335+ const errorOne = spyOne . firstCall . args [ 0 ] ;
336+ const errorTwo = spyTwo . firstCall . args [ 0 ] ;
337+ expect ( errorOne ) . to . be . instanceof ( MongoRuntimeError ) ;
338+ expect ( errorTwo ) . to . be . instanceof ( MongoRuntimeError ) ;
339+ } ) ;
340+ } ) ;
287341 } ) ;
288342 } ) ;
289343
0 commit comments