-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: encapsulate existing SDK data system [DRAFT] #188
refactor: encapsulate existing SDK data system [DRAFT] #188
Conversation
Introduces the new `datasource_v2` package to hold all FDv2 related functionality. This package contains a streaming datasource implementation that is compatible with our current SDK, but pulls data updates from a FDv2-compatible source. It does not support any of the other FDv2-specific features like picking up from a known state. That will be introduced in later work as we re-shape the datasource integration on a larger scale.
Expands the `datasourcev2` package with a polling datasource implementation that is compatible with our current SDK, but pulls data updates from a FDv2-compatible source. It does not support any of the other FDv2-specific features like picking up from a known state. That will be introduced in later work as we re-shape the datasource integration on a larger scale.
fc243c3
to
bfc6cbf
Compare
) | ||
|
||
type StatusMonitorable interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refactoring defines an interface with a single method - one of DataStore's methods. This component only needed that single method; this interface documents that fact.
It also makes it way easier to pass in something to satisfy it, instead of having to implement a bunch of no-op methods.
@@ -0,0 +1,146 @@ | |||
package datasystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole file implements the status quo, pre-FDv2.
It refactors out a bunch of config logic that used to live in the SDK client, stuffing it behind the new DataSystem
interface.
Since the DataSystem
interface offers some new capabilities, it also implements those capabilities in terms of the old system. I'll call that out in the comments.
} | ||
|
||
func NewFDv1(dataStoreFactory subsystems.ComponentConfigurer[subsystems.DataStore], dataSourceFactory subsystems.ComponentConfigurer[subsystems.DataSource], clientContext *internal.ClientContextImpl) (*FDv1, error) { | ||
system := &FDv1{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a copy-paste of existing SDK configuration logic. If it looks like a bunch of circular dependencies, that's because it is.
return factory.Build(&contextCopy) | ||
} | ||
|
||
func (f *FDv1) DataSourceStatusBroadcaster() *internal.Broadcaster[interfaces.DataSourceStatus] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This series of getters is to satisfy the SDK client's public interface, where these status providers are exposed. The FDv2 data system will also need to return something here, but they may not be functional (for now, or maybe I'll find a way to make them function.)
func (f *FDv1) Offline() bool { | ||
return f.dataSource == datasource.NewNullDataSource() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a new DataSystem method: the "data status"!
This gives users what they really want to know: does the SDK have data, and if so, is it "fresh". In the FDv1 world, we can map the statuses like so:
- If the data source is initialized, then the data has been
Refreshed
(because our data sources are either LaunchDarkly-provided streaming or polling, we know that if they initialized, we have fresh data) - If the data source is not initialized, then we may have some
Cached
data from a persistent store. - Otherwise we got nothing.
We used to have a bool (initialized), now we're offering a tribool.
} | ||
} | ||
|
||
func (f *FDv1) Store() subsystems.ReadOnlyStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for the SDK client to have write access to the store, I think. Returning a more restricted interface here to represent that.
dataStoreUpdateSink := datastore.NewDataStoreUpdateSinkImpl(bcasters.dataStoreStatus) | ||
clientContextCopy := *clientContext | ||
clientContextCopy.DataStoreUpdateSink = dataStoreUpdateSink | ||
clientContextCopy.DataSourceUpdateSink = store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some funkyness here just like in FDv1
struct due to the way clientContext
is passed between components during the configuration stage.
|
||
if cfg.Store != nil { | ||
fdv2.dataStoreStatusProvider = datastore.NewDataStoreStatusProviderImpl(cfg.Store, dataStoreUpdateSink) | ||
store.SetPersistent(cfg.Store, cfg.StoreMode, fdv2.dataStoreStatusProvider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pass cfg.Store
, cfg.StoreMode
, and dataStoreStatusProvider
in the constructor of store
? We can't.. because the dataStoreStatusProvider
depends on cfg.Store
being built, but we can't build that without store
(because store
implements DataSourceStatusSink
)
Sadness.
b9de8dd
to
11bf0e6
Compare
// connecting to LaunchDarkly, you can add a custom initializer: | ||
// | ||
// config := ld.Config { | ||
// DataSystem: ldcomponents.DataSystem().PrependInitializers(myCustomInitializer), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making this prepend, can this be a simple add style method? Prepending adds an unnecessary "think of this as a stack" mental overhead I think we'd like to avoid.
I realize these are supposed to run before our default initializer, but maybe we can figure out a better way to represent that. Maybe we call these custom initializers, and the docs make that clear that's before the built-in. We can discuss.
Also we should make this method singular unless it is going to be variadic.
return "StreamingDataSourceV2" | ||
} | ||
|
||
func (sp *StreamProcessor) Fetch(ctx context.Context) (*subsystems.InitialPayload, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not even have this method implemented then?
@@ -161,6 +173,8 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< | |||
|
|||
sp.logConnectionResult(true) | |||
|
|||
// TODO(cwaldren/mkeeler): Should this actually be true by default? It means if we receive an event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think leaving it as is would result in the same behavior we currently have, right? So it's probably fine to keep it.
We could consider setting it to false until we get a useful event. But practically I doubt it would make much of a difference.
// TODO: the data source previously had the responsibility of figuring out if storing an update failed, | ||
// and then potentially restarting the streaming connection to get a new PUT so that it could init | ||
// the database if updates got lost. This is no longer the responsibility of the data source (now the | ||
// data system will handle it.) Ideally, the update sink's methods should not be fallible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vital changes.
// data system will handle it.) Ideally, the update sink's methods should not be fallible. | |
// data system will handle it). Ideally, the update sink's methods should not be fallible. |
if newStoreStatus.Available { | ||
// The Store has just transitioned from unavailable to available | ||
if newStoreStatus.NeedsRefresh { | ||
f.loggers.Warn("Reinitializing data Store from in-memory cache after after data Store outage") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f.loggers.Warn("Reinitializing data Store from in-memory cache after after data Store outage") | |
f.loggers.Warn("Reinitializing data Store from in-memory cache after data Store outage") |
if newStoreStatus.Available { | ||
// The Store has just transitioned from unavailable to available | ||
if newStoreStatus.NeedsRefresh { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaner to make compound conditional instead? Accepting this would require another change below, and probably some formatting fixes.
if newStoreStatus.Available { | |
// The Store has just transitioned from unavailable to available | |
if newStoreStatus.NeedsRefresh { | |
if newStoreStatus.Available && newStoreStatus.NeedsRefresh { | |
// The Store has just transitioned from unavailable to available |
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | |
} | |
} |
return s.availability | ||
} | ||
|
||
// Mirroring returns true data is being mirrored to a persistent store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Mirroring returns true data is being mirrored to a persistent store. | |
// Mirroring returns true if data is being mirrored to a persistent store. |
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
|
||
// TXNS-PS: Requirement 1.3.3, must apply updates to in-memory before the persistent Store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we settle on a format for this? Maybe it would be easier if we went with TXNS-PS(1.3.3): whatever
.
@@ -269,7 +291,8 @@ func MakeCustomClient(sdkKey string, config Config, waitFor time.Duration) (*LDC | |||
) | |||
} | |||
|
|||
dataProvider := ldstoreimpl.NewDataStoreEvaluatorDataProvider(store, loggers) | |||
// TODO: We can't actually pass STore() here because it wont' swap between the active ones. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: We can't actually pass STore() here because it wont' swap between the active ones. | |
// TODO: We can't actually pass Store() here because it won't swap between the active ones. |
type DataAvailability string | ||
|
||
const ( | ||
// Defaults means the SDK has no data and will evaluate flags using the application-provided default values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be a function from timestamp to status.
So a given timestamp is Default, Stale, Fresh.
77aa8df
to
d62afd7
Compare
The purpose of this PR is to encapsulate all of the configuration/plumbing logic related to "fdv1" store/sources.
This will live behind a
DataSystem
interface defined in the client. It should have no behavioral change.We can then introduce the fdv2 concepts satisfying this same interface. At that point, the client's configuration will determine which data system to pick at runtime. This will allow both to coexist for a testing phase.
Current status: This PR is like a sandbox. It's pretty huge and touches a lot of systems. We can factor out some of the components and introduce them incrementally, given we are happy with how they fit together in this PR.