Skip to content

Conversation

guidobrei
Copy link
Member

@guidobrei guidobrei commented Sep 17, 2025

This PR

  • fixes a potential deadlock in FlagSyncService
  • fixes synchronization issues between getMetadata and syncFlags

Related Issues

Fixes #1583

Notes

We've identified two related issues in current implementation:

  1. It potentially waits forever on receiving a message on a queue by calling incomingQueue.take() some message put in the blocking queue.
  2. If getMetadata fails we want to cancel the stream. However, this never worked properly because the two parts listened on different cancellation tokens.

The new implementation eliminates problem 1) by removing incomingQueue at all and mitigates 2) by sharing a cancellation token.

Follow-up Tasks

Still open is the removal of deprecated getMetadata open-feature/flagd#1584.

How to test

Clagd testbed covers FlagSyncService

… added Stream cancellation

Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @guidobrei, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the FlagSyncService to enhance the stability and correctness of stream observation. The primary goal is to prevent a potential deadlock scenario and improve the synchronization of stream cancellation. By removing a blocking queue and implementing a unified cancellation context, the service now handles metadata retrieval and flag synchronization more robustly, ensuring that stream failures or metadata issues lead to proper stream restarts.

Highlights

  • Deadlock Prevention: Eliminated a potential deadlock in FlagSyncService by removing the incomingQueue and its blocking take() operation.
  • Stream Synchronization: Resolved synchronization issues between getMetadata and syncFlags by sharing a single Context.CancellableContext, ensuring consistent cancellation behavior.
  • Refactored Stream Observation: Introduced a dedicated SyncStreamObserver class to manage gRPC stream events directly, simplifying the stream handling logic.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a potential deadlock in FlagSyncService by removing the blocking incomingQueue.take() call and refactors the stream handling logic to be more robust. The fix for the synchronization issue between getMetadata and syncFlags by sharing a CancellableContext is also a good improvement. My review includes a few suggestions to further improve the new implementation, mainly around error handling and thread interruption best practices. Specifically, I've pointed out an opportunity to restore the error message in the error payload, adjust a logging level, and properly handle InterruptedException.

localStub.syncFlags(syncRequest.build(), new QueueingStreamObserver<SyncFlagsResponse>(incomingQueue));
localStub.syncFlags(syncRequest.build(), streamObserver);

while (!shutdown.get() && !streamObserver.context.isCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this loop, couldn't we cancel the context in the shutdown() method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is executed in a thread. shutdown is an AtomicBoolean to signal shutdown to the stream RPC and the underlying channel connector. But you're right, we maybe want to cancel the stream when we get a shutdown signal.

Will modify the code.

Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
@guidobrei
Copy link
Member Author

/gemini summary
/gemini review

Copy link

Hello guidobrei,

Thank you for submitting this pull request and for requesting a review. I've analyzed the changes and here's a summary and a detailed review.

Pull Request Summary

This pull request, titled "fix(flagd): Fix stream observation in FlagSyncService", addresses two critical issues within the FlagSyncService: a potential deadlock and synchronization problems between getMetadata and syncFlags. The core of the solution involves a significant refactoring of the stream observation mechanism. The incomingQueue (a LinkedBlockingQueue) has been removed, eliminating a source of potential blocking. Instead, a new SyncStreamObserver class is introduced, which directly handles stream events (onNext, onError, onCompleted) and uses an Awaitable for synchronization. The getMetadata logic is now encapsulated in a dedicated method, and its interaction with the stream's lifecycle is improved to ensure proper cancellation. These changes aim to enhance the robustness and reliability of the flag synchronization process.

Detailed Review

I've reviewed the changes based on correctness, efficiency, and maintainability, adhering to the Google Java Style Guide for any stylistic suggestions.

Overall Feedback

This is a well-structured and impactful change. The refactoring effectively addresses the identified issues, leading to a more robust and maintainable stream observation mechanism. The separation of concerns into getMetadata and the SyncStreamObserver is a significant improvement.

Review Comments

File: providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

  • Correctness & Maintainability (Imports and Field Removal - Diff 1, 2, 3)

    • The removal of QueueingStreamObserver, StreamResponseModel, Context, CancellableContext, and the incomingQueue field is a direct and effective way to resolve the identified deadlock and synchronization issues. The new Awaitable and StreamObserver imports are appropriate for the revised asynchronous pattern.
  • Correctness & Maintainability (Refactor of observeSyncStream and new methods - Diff 4)

    • The restructuring of observeSyncStream is excellent. It clearly separates the concerns of metadata retrieval and flag synchronization, making the logic easier to follow and reason about.
    • The new getMetadata() method correctly handles the syncMetadataDisabled case and applies the deadline. Using GetMetadataRequest.getDefaultInstance() is appropriate here.
    • The syncFlags(observer) call followed by streamObserver.done.await() provides a clear and effective synchronization point, replacing the previous blocking queue mechanism.
  • Correctness & Maintainability (New SyncStreamObserver class - Diff 5)

    • The introduction of SyncStreamObserver as a nested static class is a strong design choice. It encapsulates the stream handling logic, improving modularity and readability.
    • The use of Awaitable for signaling completion is a clean and efficient way to manage the lifecycle of the stream observation.
    • The onNext method correctly prioritizes syncFlagsResponse.hasSyncContext() for syncContext, falling back to the metadata retrieved earlier. This ensures the most relevant context is used.
    • The onError and onCompleted methods properly log events and ensure done.wakeup() is called, preventing indefinite blocking in the main observeSyncStream loop.

File: providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

  • Correctness (Test Updates - Diff 1, 2, 3)
    • The test file has been correctly updated to reflect the changes in the main class, specifically regarding the removal of QueueingStreamObserver and the adoption of StreamObserver. This indicates that the tests are being maintained alongside the production code, which is crucial for ensuring correctness.

Conclusion

The changes in this pull request are well-thought-out and effectively address the described problems. The refactoring improves the code's correctness, efficiency, and maintainability. I recommend merging this PR once any minor suggestions are addressed.


// inner loop for handling messages
while (!shutdown.get() && !Context.current().isCancelled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this call to the current context Context.current().isCancelled() intentional or should it have been a call to the context context.isCancelled(), created before and used in the metadata block above to also skip flag sync when?

Context.current() != context, because the call to Context.current().withCancellation() only creates a new context, but does not set it as current context.

In the previous version, flags are always synced, even if getMetadata fails. This behavior is different in my implementation. In the new version, when getMatadata fails, we continue the loop and try again.

@toddbaert I saw this change was introduced by you here. Any chance you can recall the intention? Based on the log message in the metadata catch block, I'd say we want the syncFlags stream to start over again.

observer.metadata = getMetadata();
} catch (Exception metaEx) {
// retry if getMetadata fails
log.warn("Metadata request exception, retrying.", metaEx);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the log level. Elevating from debug to warn might be too obtrusive.

@toddbaert
Copy link
Member

This looks like a good simplification. I will thoroughly review on Monday 🙏

Signed-off-by: Guido Breitenhuber <guido.breitenhuber@dynatrace.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[flagd] Reconnect Functionality Fails Under Certain Conditions, Transitioning System to Error State
6 participants