Skip to content

Commit

Permalink
Merge pull request #785 from ankon/pr/pending-request-without-expecte…
Browse files Browse the repository at this point in the history
…d-response

Resolve socket requests without response immediately when they have been queued
  • Loading branch information
Nevon authored Jun 25, 2020
2 parents cff2cfc + aba2e52 commit d86af3a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
13 changes: 10 additions & 3 deletions src/network/requestQueue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,23 @@ module.exports = class RequestQueue {
return
}

this.sendSocketRequest(socketRequest)
}

/**
* @param {SocketRequest} socketRequest
*/
sendSocketRequest(socketRequest) {
socketRequest.send()

if (!socketRequest.expectResponse) {
this.logger.debug(`Request does not expect a response, resolving immediately`, {
clientId: this.clientId,
broker: this.broker,
correlationId,
correlationId: socketRequest.correlationId,
})

this.inflight.delete(correlationId)
this.inflight.delete(socketRequest.correlationId)
socketRequest.completed({ size: 0, payload: null })
}
}
Expand All @@ -147,7 +154,7 @@ module.exports = class RequestQueue {

if (this.pending.length > 0) {
const pendingRequest = this.pending.pop()
pendingRequest.send()
this.sendSocketRequest(pendingRequest)

this.logger.debug(`Consumed pending request`, {
clientId: this.clientId,
Expand Down
52 changes: 38 additions & 14 deletions src/network/requestQueue/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,25 @@ describe('Network > RequestQueue', () => {
}
})

it('calls send on the request', () => {
requestQueue.push(request)
expect(send).toHaveBeenCalledTimes(1)
})

describe('when the request does not require a response', () => {
beforeEach(() => {
request.expectResponse = false
describe('when there are no inflight requests', () => {
it('calls send on the request', () => {
requestQueue.push(request)
expect(send).toHaveBeenCalledTimes(1)
})

it('deletes the inflight request and complete the request', () => {
requestQueue.push(request)
expect(request.entry.resolve).toHaveBeenCalledWith(
expect.objectContaining({ size: 0, payload: null })
)
describe('when the request does not require a response', () => {
beforeEach(() => {
request.expectResponse = false
})

expect(requestQueue.inflight.size).toEqual(0)
it('deletes the inflight request and complete the request', () => {
requestQueue.push(request)
expect(request.entry.resolve).toHaveBeenCalledWith(
expect.objectContaining({ size: 0, payload: null })
)

expect(requestQueue.inflight.size).toEqual(0)
})
})
})

Expand All @@ -80,6 +82,28 @@ describe('Network > RequestQueue', () => {
expect(requestQueue.pending.length).toEqual(1)
})

describe('when the request does not require a response', () => {
beforeEach(() => {
request.expectResponse = false
})

it('deletes the inflight request and complete the request when it is processed', () => {
requestQueue.push(request)

// Process the queue except the entry for the request, which should get handled automatically
for (const correlationId of requestQueue.inflight.keys()) {
if (correlationId !== request.entry.correlationId) {
requestQueue.fulfillRequest({ correlationId, size: 1, payload: Buffer.from('a') })
}
}
expect(request.entry.resolve).toHaveBeenCalledWith(
expect.objectContaining({ size: 0, payload: null })
)

expect(requestQueue.inflight.size).toEqual(0)
})
})

describe('when maxInFlightRequests is null', () => {
let maxInFlightRequests

Expand Down

0 comments on commit d86af3a

Please sign in to comment.