Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a Promise in the RunloopState.Started as suggested by @erikvanoosten #910

Closed
wants to merge 2 commits into from

Conversation

guizmaii
Copy link
Member

@guizmaii guizmaii commented Jun 7, 2023

case RunloopState.NotStarted if shouldStartIfNot =>
for {
promise <- Promise.make[Nothing, Runloop]
_ <- makeRunloop.map(promise.succeed).fork
Copy link
Member Author

@guizmaii guizmaii Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tests are failing because of this fork. It might stop too early. I'm not sure how to wait for the end of the makeRunloop.map(promise.succeed) IO before to close the fiber without blocking the outer IO 🤔

@erikvanoosten Any idea?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid mistake: I used .map(promise.succeed) instead of .flatMap(promise.succeed) 🤦‍♂️

Copy link
Member Author

@guizmaii guizmaii Jun 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, now that this stupid mistake is fixed, your solution is not passing the tests:

  + issue #856
    + Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception
    + Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock
      + When not consuming, the Runloop is not started so only the Consumer is finalized
      + When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock
      - Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption
        ✗ 100000 was not less than 100000
        consumed_0 did not satisfy isLessThan(numberOfMessages.toLong)
        consumed_0 = 100000
        at /Users/jules/conduktor/workspace/zio-kafka/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala:1163

I'll be honest, I don't understand why it doesn't work. AFAIU, it should 🤔🤨

if (retry <= 0) ZIO.unit
else if (initialCall) next
else next.delay(10.millis)
def stopConsumption: UIO[Boolean] =
Copy link
Member Author

@guizmaii guizmaii Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this Boolean in the returned type so that the caller can retry it if needed.
The test was failing because it was calling the stopConsumption before the Runloop had enough time to start.
We can see this in the logs of the test:

17:05:33.564 [ZScheduler-Worker-1] [] DEBUG zio.kafka.consumer.Consumer.Live - stopConsumption called
17:05:33.566 [ZScheduler-Worker-1] [] DEBUG zio.kafka.consumer.Consumer.Live - stopConsumption called
17:05:33.598 [ZScheduler-Worker-2] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting Runloop
17:05:33.598 [ZScheduler-Worker-2] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting Runloop
17:05:33.608 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: AddSubscription(Manual(Set(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)))
17:05:33.609 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: AddSubscription(Manual(Set(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)))
17:05:33.617 [zio-kafka-runloop-thread-0] [] DEBUG z.k.c.i.PartitionStreamControl - Creating partition stream consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0
17:05:33.617 [zio-kafka-runloop-thread-0] [] DEBUG z.k.c.i.PartitionStreamControl - Creating partition stream consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0
17:05:33.652 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: Request(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)
17:05:33.652 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Processing 1 commands: Request(consumespec-topic-17a94a7b-6a73-44ec-a909-db7bce907797-0)
17:05:33.653 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting poll with 1 pending requests and 0 pending commits
17:05:33.654 [zio-kafka-runloop-thread-0] [] DEBUG zio.kafka.consumer.internal.Runloop - Starting poll with 1 pending requests and 0 pending commits
17:05:33.734 [ZScheduler-Worker-12] [] DEBUG z.kafka.consumer.ConsumerSpec.spec - Consumed 0 messages
17:05:33.735 [ZScheduler-Worker-12] [] DEBUG z.kafka.consumer.ConsumerSpec.spec - Consumed 100 messages
...

@guizmaii
Copy link
Member Author

Closing because the test solution (using a Promise) doesn't solve the issue

@guizmaii guizmaii closed this Jun 14, 2023
@guizmaii guizmaii deleted the fix_856_promise branch June 14, 2023 13:14
guizmaii added a commit that referenced this pull request Jun 15, 2023
guizmaii added a commit that referenced this pull request Jun 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant