Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hangs on disconnect with inflight requests #918

Closed
goriunov opened this issue Oct 13, 2020 · 7 comments
Closed

Hangs on disconnect with inflight requests #918

goriunov opened this issue Oct 13, 2020 · 7 comments
Labels

Comments

@goriunov
Copy link
Contributor

goriunov commented Oct 13, 2020

Describe the bug

Looks like 5f0979f has introduced bug. I have been running chaos tests on latest kafkajs version (1.14.0) and picked up new issue. If i block kafka port for some time while producing data and will not stop producing data then it will fail and will infinitely try to disconnect from kafka as it is infinitely waiting for pending/inflight requests as i am not stopping producing data, here is just some extract of logs important part Waiting for pending requests and it never gets resolved:

�[34mdebug�[39m: {"timestamp":"2020-10-12T23:54:30.228Z","logger":"kafkajs","message":"Request Produce(key: 0, version: 7)","broker":"REDACTED.REDACTED:9092","clientId":"kafkajs-75048","correlationId":15,"expectResponse":true,"size":750}
�[31merror�[39m: {"timestamp":"2020-10-12T23:54:30.229Z","logger":"kafkajs","message":"Connection error: Cannot call write after a stream was destroyed","broker":"REDACTED.REDACTED:9092","clientId":"kafkajs-75048","stack":"Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed\n    at doWrite (_stream_writable.js:399:19)\n    at writeOrBuffer (_stream_writable.js:387:5)\n    at Socket.Writable.write (_stream_writable.js:318:11)\n    at Object.sendRequest (node_modules/kafkajs/src/network/connection.js:312:27)\n    at SocketRequest.send [as sendRequest] (node_modules/kafkajs/src/network/requestQueue/index.js:137:23)\n    at SocketRequest.send (node_modules/kafkajs/src/network/requestQueue/socketRequest.js:85:10)\n    at RequestQueue.sendSocketRequest (node_modules/kafkajs/src/network/requestQueue/index.js:166:19)\n    at RequestQueue.push (node_modules/kafkajs/src/network/requestQueue/index.js:146:12)\n    at node_modules/kafkajs/src/network/connection.js:307:29\n    at new Promise (<anonymous>)"}
�[34mdebug�[39m: {"timestamp":"2020-10-12T23:54:30.229Z","logger":"kafkajs","message":"disconnecting...","broker":"REDACTED.REDACTED:9092","clientId":"kafkajs-75048"}
Pending 0 1
�[34mdebug�[39m: {"timestamp":"2020-10-12T23:54:30.229Z","logger":"kafkajs","message":"Waiting for pending requests","clientId":"kafkajs-75048","broker":"REDACTED.REDACTED:9092","currentInflightRequests":1,"currentPendingQueueSize":0}
Published 89, queue size if: 74
Published 91, queue size if: 76
Published 93, queue size if: 78
�[34mdebug�[39m: {"timestamp":"2020-10-12T23:55:00.867Z","logger":"kafkajs","message":"Request Produce(key: 0, version: 7)","broker":"REDACTED.REDACTED:9092","clientId":"kafkajs-75048","correlationId":16,"expectResponse":true,"size":750}
�[31merror�[39m: {"timestamp":"2020-10-12T23:55:00.868Z","logger":"kafkajs","message":"Connection error: Cannot call write after a stream was destroyed","broker":"REDACTED.REDACTED:9092","clientId":"kafkajs-75048","stack":"Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed\n    at doWrite (_stream_writable.js:399:19)\n    at writeOrBuffer (_stream_writable.js:387:5)\n    at Socket.Writable.write (_stream_writable.js:318:11)\n    at Object.sendRequest (node_modules/kafkajs/src/network/connection.js:312:27)\n    at SocketRequest.send [as sendRequest] (node_modules/kafkajs/src/network/requestQueue/index.js:137:23)\n    at SocketRequest.send (node_modules/kafkajs/src/network/requestQueue/socketRequest.js:85:10)\n    at RequestQueue.sendSocketRequest (node_modules/kafkajs/src/network/requestQueue/index.js:166:19)\n    at RequestQueue.push (node_modules/kafkajs/src/network/requestQueue/index.js:146:12)\n    at node_modules/kafkajs/src/network/connection.js:307:29\n    at new Promise (<anonymous>)"}
�[34mdebug�[39m: {"timestamp":"2020-10-12T23:55:00.868Z","logger":"kafkajs","message":"disconnecting...","broker":"REDACTED.REDACTED:9092","clientId":"kafkajs-75048"}
Pending 0 1
�[34mdebug�[39m: {"timestamp":"2020-10-12T23:55:00.868Z","logger":"kafkajs","message":"Waiting for pending requests","clientId":"kafkajs-75048","broker":"REDACTED.REDACTED:9092","currentInflightRequests":1,"currentPendingQueueSize":0}
Published 95, queue size if: 80
Published 97, queue size if: 82

You can try to just do some thing like that where you retry publish on error:

const writeMessage = (msg) => {
     producer.send(msg).catch((err) => writeMessage(msg))
}

And with above you should be able to see that we almost always have inflight requests so it can not properly disconnect, you should be really lucky to get disconnected :) ...

Pending [] Map {
  3 => SocketRequest {
    createdAt: 1602550800040,
    requestTimeout: 30000,
    broker: 'XYZ:9092',
    clientId: 'kafkajs-76313',
    entry: {
      apiKey: 3,
      apiName: 'Metadata',
      apiVersion: 5,
      correlationId: 3,
      resolve: [Function],
      reject: [Function]
    },
    correlationId: 3,
    expectResponse: true,
    sendRequest: [Function: send],
    timeoutHandler: [Function: timeout],
    sentAt: null,
    duration: null,
    pendingDuration: null,
    [Symbol(private:SocketRequest:state)]: Symbol(PENDING),
    [Symbol(private:SocketRequest:emitEvent)]: [Function]
  }
}

This issue persists even if i have timeout 100ms.

Environment:

  • OS: [e.g. Mac OS 10.15.3] Mac OS
  • KafkaJS version [e.g. 1.12.0] 1.14.0
  • Kafka version [e.g. 2.3.1] 2.5.0
  • NodeJS version [e.g. 10.13.0] 12.18.0
@goriunov goriunov changed the title Issue with Requests Timeout Does not correctly disconnects with inflight requires Oct 13, 2020
@goriunov goriunov changed the title Does not correctly disconnects with inflight requires Hangs on disconnect with inflight requests Oct 13, 2020
@tulios
Copy link
Owner

tulios commented Oct 13, 2020

Are you calling producer.disconnect()?

@goriunov
Copy link
Contributor Author

No, but i believe it is automatic as system failed at some point due to my chaos test (most likely socket has been ended) and it is trying to reconnect, but before it can do above it needs to disconnect and that step is never completed due to continues send requests, this causes whole producer to stall... We were running 1.14.0 in test env few days ago and had small network issue which caused minor outage for short time afterwords consumer recovered properly but producer was completely stack with infinity disconnections for over a day, required manual restart on our side.

@goriunov
Copy link
Contributor Author

Exactly the same code is working correctly with version 1.12.0, after some investigation the issue was narrowed down to the first post.

@tulios
Copy link
Owner

tulios commented Oct 13, 2020

It makes sense, we have to treat timeouts differently. We were planning a 1.14.1, I will wait until we add this fix.

@Nevon Nevon added the bug label Oct 31, 2020
@nikoloz17
Copy link

nikoloz17 commented Nov 19, 2020

ERROR [Connection] Connection error: Cannot call write after a stream was destroyed {"timestamp":"2020-11-19T04:10:56.473Z","logger":"kafkajs","broker":"amazonaws.com:9092","stack":"Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed\n    at doWrite (_stream_writable.js:427:19)\n    at writeOrBuffer (_stream_writable.js:415:5)\n    at Socket.Writable.write (_stream_writable.js:305:11)\n    at Object.sendRequest (/app/newrow-bms-api-gateway/node_modules/kafkajs/src/network/connection.js:312:27)\n    at SocketRequest.send [as sendRequest] (/app/newrow-bms-api-gateway/node_modules/kafkajs/src/network/requestQueue/index.js:135:23)\n    at SocketRequest.send (/app/newrow-bms-api-gateway/node_modules/kafkajs/src/network/requestQueue/socketRequest.js:85:10)\n    at RequestQueue.sendSocketRequest (/app/newrow-bms-api-gateway/node_modules/kafkajs/src/network/requestQueue/index.js:164:19)\n    at RequestQueue.push (/app/newrow-bms-api-gateway/node_modules/kafkajs/src/network/requestQueue/index.js:144:12)\n    at /app/newrow-bms-api-gateway/node_modules/kafkajs/src/network/connection.js:307:29\n    at new Promise (<anonymous>)"}

Got the same issue. Basically what happened was, we lost connection to brokers for couple of minutes and then it came back. But it was already too late, all the connections were disconnected and this message was being logged over and over.

If there is a fix already available for this, 1.14.1 sounds like an idea. Thanks in advance

@Nevon
Copy link
Collaborator

Nevon commented Nov 19, 2020

I believe this was fixed by #944 and #956. You can try it out with 1.15.0-beta.25 and feel free to re-open this issue if it is not solved.

@Nevon Nevon closed this as completed Nov 19, 2020
@suvorovis
Copy link

I believe this was fixed by #944 and #956. You can try it out with 1.15.0-beta.25 and feel free to re-open this issue if it is not solved.

No, it wasn't fixed, I have faced this issue in 1.15.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants