-
Notifications
You must be signed in to change notification settings - Fork 83
Allow consumeFirstStringMessageFrom to be called multiple times #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alexadriaanse! I'll be happy to merge if you can take care of what I highlighted. Also, when you introduce configuration, could you please make sure there's some documentation on that! Thanks.
@@ -236,6 +236,7 @@ sealed trait EmbeddedKafkaSupport { | |||
props.put("group.id", s"embedded-kafka-spec") | |||
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}") | |||
props.put("auto.offset.reset", "earliest") | |||
props.put("enable.auto.commit", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would feel better if this was a configuration value with a default true
in order not to break backwards compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hesitant to make this change for the reason you pointed out, but unfortunately it has to be false
, otherwise functionality will remain broken. If auto-commit is true (which was the case before my commit) a call to consumeFirstStringMessageFrom
will cause all messages to be committed, not just the first one.
The addition of this line should only change the behavior of consumeFirstMessageFrom
, and (when combined with the other changes in this PR) brings the behavior of this function closer to the behavior of 0.5.0, which only committed messages that it returned. So I'd argue this change actually preserves backwards compatibility for people coming from 0.5.0. That is how I discovered this issue: we upgraded an internal app from scalatest-embedded-kafka 0.5.0 to 0.10.0 and had a unit test in an internal app fail as a result of the upgrade.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was probably overlooked by my side as I didn't realise that. I'll be fine with leaving false
as default, but it's still better to add a parameter to enable or disable this.
@@ -44,6 +44,9 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf | |||
|
|||
consumer.close() | |||
|
|||
// Commit message in embedded-kafka-spec group so we don't consume it | |||
// again in the consumeFirstStringMessageFrom tests below. | |||
consumeFirstStringMessageFrom(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Committing here shouldn't happen with using the consumeFirstStringMessageFrom()
method, but with a direct commit instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this felt like a hack writing this. However, I can't easily do a direct commit, as that would commit the message in the test
consumer group, whereas it needs to be committed in embedded-kafka-spec
.
Thinking this through a little more, rather than relying on this hack I should just change the topics we're publishing to in order to keep unit tests from stepping on each other's toes. I'll commit this change in a new commit.
|
||
// Commit message in embedded-kafka-spec group so we don't consume it | ||
// again in the consumeFirstStringMessageFrom tests below. | ||
consumeFirstStringMessageFrom(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also here, committing here shouldn't happen with using the consumeFirstStringMessageFrom() method, but with a direct commit instead. We could extract a utility method actually.
Just to make sure we're on the same page, when you're talking about adding documentation are you just talking about adding a comment to the appropriate line in |
Scaladoc would be enough I'd say. |
cd83bab
to
aa24260
Compare
I've addressed your feedback and have squashed my changes into my previous commit. Let me know if you have any other feedback. Thanks! |
aa24260
to
0cd2955
Compare
In v0.5.0 you were able to call consumeFirstStringMessageFrom multiple times, with each invocation causing the next message in a topic to be consumed. In v0.10.0 this was no longer the case: the first invocation works fine, but subsequent invocations would result in exceptions as the first invocation would cause *all* messages to be committed even though only the first one is returned, leaving no messages remaining to be consumed by subsequent calls. This regression was likely introduced by commit a906743. This commit fixes this by disabling auto-commit by default and simply committing only the message that consumeFirstStringMessageFrom returns.
0cd2955
to
96251d7
Compare
Thanks @alexadriaanse - I just need to verify how the new config value for |
In older versions of scalatest-embedded-kafka you were able to call
consumeFirstStringMessageFrom multiple times, with each invocation
causing the next message in a topic to be consumed. In the latest
version this is no longer the case: the first invocation works fine, but
subsequent invocations would result in exceptions as the first
invocation would cause all messages to be committed even though only
the first one is returned. We fix this by disabling auto-commit and
simply committing only the message that consumeFirstStringMessageFrom
returns.