Skip to content

Commit

Permalink
Merge pull request #666 from tulios/reproduce-665
Browse files Browse the repository at this point in the history
Remove invalid topics from targetTopics on INVALID_TOPIC_EXCEPTION
  • Loading branch information
Nevon authored Mar 18, 2020
2 parents 03409e2 + c091f5c commit 897715f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
34 changes: 27 additions & 7 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const BrokerPool = require('./brokerPool')
const Lock = require('../utils/lock')
const createRetry = require('../retry')
const connectionBuilder = require('./connectionBuilder')
const flatten = require('../utils/flatten')
Expand Down Expand Up @@ -78,6 +79,10 @@ module.exports = class Cluster {
})

this.targetTopics = new Set()
this.mutatingTargetTopics = new Lock({
description: `updating target topics`,
timeout: requestTimeout,
})
this.isolationLevel = isolationLevel
this.brokerPool = new BrokerPool({
connectionBuilder: this.connectionBuilder,
Expand Down Expand Up @@ -162,15 +167,30 @@ module.exports = class Cluster {
* @return {Promise}
*/
async addMultipleTargetTopics(topics) {
const previousSize = this.targetTopics.size
for (const topic of topics) {
this.targetTopics.add(topic)
}
await this.mutatingTargetTopics.acquire()

const hasChanged = previousSize !== this.targetTopics.size || !this.brokerPool.metadata
try {
const previousSize = this.targetTopics.size
const previousTopics = new Set(this.targetTopics)
for (const topic of topics) {
this.targetTopics.add(topic)
}

if (hasChanged) {
await this.refreshMetadata()
const hasChanged = previousSize !== this.targetTopics.size || !this.brokerPool.metadata

if (hasChanged) {
try {
await this.refreshMetadata()
} catch (e) {
if (e.type === 'INVALID_TOPIC_EXCEPTION') {
this.targetTopics = previousTopics
}

throw e
}
}
} finally {
await this.mutatingTargetTopics.release()
}
}

Expand Down
37 changes: 37 additions & 0 deletions src/producer/__tests__/producingToInvalidTopic.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const { secureRandom, newLogger, createCluster, createTopic } = require('testHelpers')
const createProducer = require('../index')

describe('Producer > Producing to invalid topics', () => {
let producer, topicName

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`

producer = createProducer({
cluster: createCluster(),
logger: newLogger(),
})
await producer.connect()
await createTopic({ topic: topicName })
})

afterEach(async () => {
producer && (await producer.disconnect())
})

it('it rejects when producing to an invalid topic name, but is able to subsequently produce to a valid topic', async () => {
producer = createProducer({
cluster: createCluster(),
logger: newLogger(),
})
await producer.connect()

const message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` }
const invalidTopicName = `${topicName}-abc)(*&^%`
await expect(producer.send({ topic: invalidTopicName, messages: [message] })).rejects.toThrow(
'The request attempted to perform an operation on an invalid topic'
)

await producer.send({ topic: topicName, messages: [message] })
})
})

0 comments on commit 897715f

Please sign in to comment.