-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
State Storage for new Scheduler #8613
Conversation
This reverts commit b40285c.
public StateStore s3(final S3Config config) { | ||
return new StateStore(new S3CloudDocumentStoreClient( | ||
new DefaultS3ClientFactory(config).get(), | ||
config.getBucketName(), | ||
STATE_ROOT)); | ||
} | ||
|
||
public StateStore minio(final MinioConfig config) { | ||
return new StateStore(new S3CloudDocumentStoreClient( | ||
new MinioS3ClientFactory(config).get(), | ||
config.getBucketName(), | ||
STATE_ROOT)); | ||
} | ||
|
||
public StateStore gcs(final GcsConfig config) { | ||
return new StateStore(new GcsCloudDocumentStoreClient( | ||
new DefaultGcsClientFactory(config).get(), | ||
config.getBucketName(), | ||
STATE_ROOT)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jared && @benmoriceau these can move to where ever it convenient. i kinda figure when we actually plug this into the worker it will become obvious if this is a good place for them, or if we ant to move them.
@davinchia this is where we are leveraging those factories that you were mentioning as redundant in the previous PR. I think they could probably just be static helpers instead of class that gets instantiated. They let us reuse the same configuration object for configuring cloud storage for here as we do for logs, so we don't have to duplicate the logic!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see!
The static pattern makes sense to me, since there isn't state to manage here.
I also have slight preference for a single class that handles all storage client creation instead of 3 separate small factories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk. i'm going to do it separately. but agreed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the static factories as jared suggested.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I appreciate the good doc strings.
One medium comment about moving the storage clients.
One small comment on cosmetic changes.
Non-blocking.
private S3Client s3Client; | ||
|
||
@BeforeEach | ||
void setup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't intend this to be generated for each test run. I don't think it's a problem since it's overwriting the same path, but we should be able to get away with just having the method for manual regeneration in case those files get corrupted/deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you prefer relying on the external resource to keep state between test runs as opposed to just having the test guarantee the correct set up every time? Seems like it's breaking the pattern we try to use in almost all of our testing. Sometimes it's not possible to do it, but here it seems almost free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems wasteful since we aren't really isolating each test run from the other (since we are always writing to the same location).
To properly isolate, we would also have to add a random prefix path and delete the old files. At that time I felt this is more trouble than it is worth, esp since the chance of something bad happening here is very small. (Someone would have to overwrite the same location)
I'm okay if you prefer the test set up doing this.
/** | ||
* Interface for treating cloud storage like a simple document store. | ||
*/ | ||
public interface CloudDocumentStoreClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a thought: can we move this into a more general module? e.g airbyte-commons?
these interfaces are going to be useful to the server for #8301, I would love to be able to reuse this for that issue :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid to catch exception and throw IO ones explicitly, In temporal we will add a catch block anyway in the activities that uses it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I agree with this. I started to look into this and then decided it was a bad idea to block on that refactor because I think it is a hard refactor.
fwiw, airbyte-commons
is definitely not the right place for it. for that module we should only be adding classes that do not add dependencies. i.e. if there is a lib we have consciously shared everywhere (e.g. jackson, junit), then it is okay, but we shouldn't be adding dependencies to commons. it's deps should be the same as the root build. Right now we should inject S3 and GCS deps everywhere, but we shouldn't do it.
I think there are a lot of problems in how we are doing dependencies for the GCS. Namely the fact that we have made S3 and GCS a dependency of every module (see slack convo here). We should figure out how to consolidate all of the code around building these clients into a single module with tightly scope dependencies. That seems to interact in a weird way with our logging configuration which I don't fully grasp yet. I'm happy to look into do it in in my slack time, but it's not a small task and I don't think we should put it on the critical path for getting this scheduler project done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid to catch exception and throw IO ones explicitly, In temporal we will add a catch block anyway in the activities that uses it.
We definitely can. I'm trying to learn from our original ConfigPersistence
. We did expose IOException
there and then because it was such a pain to always be try / catching that exception in callers pretty much had ConfigRepository
come into swallow the exceptions. If it meaningfully better to bubble up the exception here, we definitely can, but if it's just a style thing, then I think we save ourselves some headache by leaving it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely can. I'm trying to learn from our original
ConfigPersistence
. We did exposeIOException
there and then because it was such a pain to always be try / catching that exception in callers pretty much hadConfigRepository
come into swallow the exceptions. If it meaningfully better to bubble up the exception here, we definitely can, but if it's just a style thing, then I think we save ourselves some headache by leaving it as is.
All the temporal activity will have their method wrapped in a try {} catch (Exception e). The goal is to be able to make a workflow to go into a quarantine state in case there is no point to re-run the sync if the exception won't be solve by a re-run (like a NPE).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public interface CloudDocumentStoreClient { | |
public interface DocumentStoreClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed jared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed with @benmoriceau offline and we agreed to hold off on the IOException for now because the only implementation of the interface that throws one is the DockerCompose version. The others wouldn't, so adding it to the iface would be mostly misleading.
airbyte-workers/src/main/java/io/airbyte/workers/storage/StateStore.java
Outdated
Show resolved
Hide resolved
public StateStore s3(final S3Config config) { | ||
return new StateStore(new S3CloudDocumentStoreClient( | ||
new DefaultS3ClientFactory(config).get(), | ||
config.getBucketName(), | ||
STATE_ROOT)); | ||
} | ||
|
||
public StateStore minio(final MinioConfig config) { | ||
return new StateStore(new S3CloudDocumentStoreClient( | ||
new MinioS3ClientFactory(config).get(), | ||
config.getBucketName(), | ||
STATE_ROOT)); | ||
} | ||
|
||
public StateStore gcs(final GcsConfig config) { | ||
return new StateStore(new GcsCloudDocumentStoreClient( | ||
new DefaultGcsClientFactory(config).get(), | ||
config.getBucketName(), | ||
STATE_ROOT)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see!
The static pattern makes sense to me, since there isn't state to manage here.
I also have slight preference for a single class that handles all storage client creation instead of 3 separate small factories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly nit comment and one suggestion for the proposed interface
airbyte-workers/src/main/java/io/airbyte/workers/storage/DockerComposeDocumentStoreClient.java
Outdated
Show resolved
Hide resolved
/** | ||
* Interface for treating cloud storage like a simple document store. | ||
*/ | ||
public interface CloudDocumentStoreClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
/** | ||
* Interface for treating cloud storage like a simple document store. | ||
*/ | ||
public interface CloudDocumentStoreClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid to catch exception and throw IO ones explicitly, In temporal we will add a catch block anyway in the activities that uses it.
} | ||
|
||
@Override | ||
public Optional<String> read(final String id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About this prototype, why is it returning a String? I feel that we should return an Optional<T>
and enforce the object is serialized with jackson (for example). I got a similar comments for the write. Is there any formart we want to use for the state than json. I mentioned Jackson because temporal is using it and the new state object are compatible with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty ambivalent. Pretty much if we ever want to store anything that's not json this might be a headache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that it will be the opposite. The serialization will depend on this wrapper only and be shared between all the application that uses it. Could it be solve by using an interface that will specify a serialization type and use the serialization type to serialize an object. I feel that if we switch to a binary serialization (like protobuff) the current interface won't feet our needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How soon do you think we'll need more specific type casting?
If it's more than 6 months away, I favor leaving this as is now and adding this later. I'm not against it, but I value the simplicity/flexibility in the mean time to give us the most space to change things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an open (draft) PR that will need it. It is passing a JobRunConfig
as an input to an activity. I was thinking about this usecase for this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it makes sense that we would use String and then have a wrapped version that lets us do Json storage with Jackson serialization.
|
||
public GcsCloudDocumentStoreClient(final Storage gcsClient, final String bucketName, final Path root) { | ||
this.gcsClient = gcsClient; | ||
this.bucketName = bucketName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create the bucket if it doesn't exist? That will help with the first run of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. good point. yes i can add that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually @jrhizor & @davinchia how did you think about this for CloudLogs
? It looks like it assumes that the buckets do exist (and I assume fails if they don't).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap. I don't think we should have this create buckets. I think it's simpler if we have the expectation that configuration passed into the cluster is expected to be correct.
This means:
- the GCP creds mounted on pods have reduced permissions, which is good
- any one operating Airbyte is forced to properly managed their buckets, which is also good.
for these reasons, I don't think this adds that much value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @davinchia here 100%
} | ||
} | ||
|
||
task cloudStorageIntegrationTest(type: Test) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, we can update the gradle.yaml file for in the .github folder to run those test in github action as well (similar to the integration tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so these tests require quite a big of environment setup (see: cloud_storage_logging_test.sh). is that compatible with the approach you are describing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can make sure that it only run on a release build or on a scheduled build using the same condition that has been use for the slow integration test or move those test as slow integration test and make the environment setup as a part of the slow integration test setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the benefit to doing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making sure that the test is run before releasing without having too much impact on the build time for regular builds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understand. the tricky thing here is that these tests function as unit tests today, since testing against actual buckets is the recommended thing to do. if we move this to master/release only tests, we'll lose testing coverage here.
was there a specific time unit you want to meet? or was this suggestion opportunistic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like there are some open questions here. i don't think this thread should block merging as it is leveraging an existing pattern.
airbyte-workers/build.gradle
Outdated
|
||
test { | ||
useJUnitPlatform { | ||
excludeTags 'log4j2-config', 'cloud-storage' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The excluded tags can go into the root gradle. There are already some tags being excluded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should everything in airbyte-config/models/build.gradle
be moved as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes so it is defined in a single place for the exclusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
/** | ||
* Interface for treating cloud storage like a simple document store. | ||
*/ | ||
public interface CloudDocumentStoreClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public interface CloudDocumentStoreClient { | |
public interface DocumentStoreClient { |
} | ||
|
||
@Override | ||
public Optional<String> read(final String id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it makes sense that we would use String and then have a wrapped version that lets us do Json storage with Jackson serialization.
|
||
public GcsCloudDocumentStoreClient(final Storage gcsClient, final String bucketName, final Path root) { | ||
this.gcsClient = gcsClient; | ||
this.bucketName = bucketName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @davinchia here 100%
private final CloudDocumentStoreClient documentStoreClient; | ||
|
||
public StateStore s3(final S3Config config) { | ||
return new StateStore(new S3CloudDocumentStoreClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like all of the helpers should be for the document store creation not the StateStore
since they only vary at that level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't they also be static?
* @param activityRunId - id to associate state with | ||
* @param state - state to persist | ||
*/ | ||
void setState(final UUID activityRunId, final State state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change this in a different PR but ReplicationOutput
is likely the first interface not just State
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline. will just call this a WorkerStore
and it will use json. then the scheduler project can build on top
relates to #7476
What
How
Follow up tasks (for the next PR)
Recommended reading order