-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a Promise in the RunloopState.Started
as suggested by @erikvanoosten
#910
Conversation
a6ed3d2
to
0530e80
Compare
case RunloopState.NotStarted if shouldStartIfNot => | ||
for { | ||
promise <- Promise.make[Nothing, Runloop] | ||
_ <- makeRunloop.map(promise.succeed).fork |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think tests are failing because of this fork
. It might stop too early. I'm not sure how to wait for the end of the makeRunloop.map(promise.succeed)
IO before to close the fiber without blocking the outer IO 🤔
@erikvanoosten Any idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stupid mistake: I used .map(promise.succeed)
instead of .flatMap(promise.succeed)
🤦♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, now that this stupid mistake is fixed, your solution is not passing the tests:
+ issue #856
+ Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception
+ Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock
+ When not consuming, the Runloop is not started so only the Consumer is finalized
+ When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock
- Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption
✗ 100000 was not less than 100000
consumed_0 did not satisfy isLessThan(numberOfMessages.toLong)
consumed_0 = 100000
at /Users/jules/conduktor/workspace/zio-kafka/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala:1163
I'll be honest, I don't understand why it doesn't work. AFAIU, it should 🤔🤨
c2e82d3
to
a922d9e
Compare
20b8419
to
93fd131
Compare
93fd131
to
5a6ea81
Compare
19d32d7
to
56034fe
Compare
56034fe
to
22adb9b
Compare
if (retry <= 0) ZIO.unit | ||
else if (initialCall) next | ||
else next.delay(10.millis) | ||
def stopConsumption: UIO[Boolean] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need this Boolean
in the returned type so that the caller can retry it if needed.
The test was failing because it was calling the stopConsumption
before the Runloop had enough time to start.
We can see this in the logs of the test:
17:05:33.564 [ZScheduler-Worker-1] [] DEBUG zio.kafka.consumer.Consumer.Live - stopConsumption called
17:05:33.566 [ZScheduler-Worker-1] [] DEBUG zio.kafka.consumer.Consumer.Live - stopConsumption called
17:05:33.598 [ZScheduler-Worker-2] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting Runloop
17:05:33.598 [ZScheduler-Worker-2] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting Runloop
17:05:33.608 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: AddSubscription(Manual(Set(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)))
17:05:33.609 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: AddSubscription(Manual(Set(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)))
17:05:33.617 [zio-kafka-runloop-thread-0] [] DEBUG z.k.c.i.PartitionStreamControl - Creating partition stream consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0
17:05:33.617 [zio-kafka-runloop-thread-0] [] DEBUG z.k.c.i.PartitionStreamControl - Creating partition stream consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0
17:05:33.652 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: Request(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)
17:05:33.652 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: Request(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)
17:05:33.653 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting poll with 1 pending requests and 0 pending commits
17:05:33.654 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting poll with 1 pending requests and 0 pending commits
17:05:33.734 [ZScheduler-Worker-12] [] DEBUG z.kafka.consumer.ConsumerSpec.spec - Consumed 0 messages
17:05:33.735 [ZScheduler-Worker-12] [] DEBUG z.kafka.consumer.ConsumerSpec.spec - Consumed 100 messages
...
Closing because the test solution (using a Promise) doesn't solve the issue |
See #857 (comment)