Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State Storage for new Scheduler #8613

Merged
merged 11 commits into from
Dec 10, 2021
Merged

State Storage for new Scheduler #8613

merged 11 commits into from
Dec 10, 2021

Conversation

cgardens
Copy link
Contributor

@cgardens cgardens commented Dec 8, 2021

relates to #7476

What

  • Creates an abstraction to allow storing state to cloud storage.

How

  • Creates a very basic document store abstraction for S3 (and minio) and GCS
  • Creates higher level abstraction that consumes the document store client so that it can be agnostic of cloud storage.

Follow up tasks (for the next PR)

  • Expose env variables in the platform and pipe them through to this client.
  • Right now this is leveraging the logger test set up. This is kinda hacky. I think I want to leave the structure the same for now, but just update the naming so that it is clear that this set up is used for all tests that use cloud storage (not just logging).

Recommended reading order

  1. CloudDocumentStoreClient.java
  2. S3CloudDocumentStoreClient.java
  3. GcsCloudDocumentStoreClient.java
  4. DockercomposeCloudDocumentStoreClient.java
  5. StateStore.java
  6. the rest

@github-actions github-actions bot added area/platform issues related to the platform area/worker Related to worker labels Dec 8, 2021
Comment on lines 29 to 48
public StateStore s3(final S3Config config) {
return new StateStore(new S3CloudDocumentStoreClient(
new DefaultS3ClientFactory(config).get(),
config.getBucketName(),
STATE_ROOT));
}

public StateStore minio(final MinioConfig config) {
return new StateStore(new S3CloudDocumentStoreClient(
new MinioS3ClientFactory(config).get(),
config.getBucketName(),
STATE_ROOT));
}

public StateStore gcs(final GcsConfig config) {
return new StateStore(new GcsCloudDocumentStoreClient(
new DefaultGcsClientFactory(config).get(),
config.getBucketName(),
STATE_ROOT));
}
Copy link
Contributor Author

@cgardens cgardens Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jared && @benmoriceau these can move to where ever it convenient. i kinda figure when we actually plug this into the worker it will become obvious if this is a good place for them, or if we ant to move them.

@davinchia this is where we are leveraging those factories that you were mentioning as redundant in the previous PR. I think they could probably just be static helpers instead of class that gets instantiated. They let us reuse the same configuration object for configuring cloud storage for here as we do for logs, so we don't have to duplicate the logic!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see!

The static pattern makes sense to me, since there isn't state to manage here.

I also have slight preference for a single class that handles all storage client creation instead of 3 separate small factories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk. i'm going to do it separately. but agreed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the static factories as jared suggested.

@cgardens
Copy link
Contributor Author

cgardens commented Dec 8, 2021

This is incomplete in one case, which is the docker-compose case. fixed. added DockerComposeDocumerntStoreClient.

@cgardens cgardens temporarily deployed to more-secrets December 8, 2021 07:26 Inactive
@cgardens cgardens temporarily deployed to more-secrets December 8, 2021 07:36 Inactive
Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I appreciate the good doc strings.

One medium comment about moving the storage clients.

One small comment on cosmetic changes.

Non-blocking.

private S3Client s3Client;

@BeforeEach
void setup() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't intend this to be generated for each test run. I don't think it's a problem since it's overwriting the same path, but we should be able to get away with just having the method for manual regeneration in case those files get corrupted/deleted.

Copy link
Contributor Author

@cgardens cgardens Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you prefer relying on the external resource to keep state between test runs as opposed to just having the test guarantee the correct set up every time? Seems like it's breaking the pattern we try to use in almost all of our testing. Sometimes it's not possible to do it, but here it seems almost free.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems wasteful since we aren't really isolating each test run from the other (since we are always writing to the same location).

To properly isolate, we would also have to add a random prefix path and delete the old files. At that time I felt this is more trouble than it is worth, esp since the chance of something bad happening here is very small. (Someone would have to overwrite the same location)

I'm okay if you prefer the test set up doing this.

/**
* Interface for treating cloud storage like a simple document store.
*/
public interface CloudDocumentStoreClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought: can we move this into a more general module? e.g airbyte-commons?

these interfaces are going to be useful to the server for #8301, I would love to be able to reuse this for that issue :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid to catch exception and throw IO ones explicitly, In temporal we will add a catch block anyway in the activities that uses it.

Copy link
Contributor Author

@cgardens cgardens Dec 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I agree with this. I started to look into this and then decided it was a bad idea to block on that refactor because I think it is a hard refactor.

fwiw, airbyte-commons is definitely not the right place for it. for that module we should only be adding classes that do not add dependencies. i.e. if there is a lib we have consciously shared everywhere (e.g. jackson, junit), then it is okay, but we shouldn't be adding dependencies to commons. it's deps should be the same as the root build. Right now we should inject S3 and GCS deps everywhere, but we shouldn't do it.

I think there are a lot of problems in how we are doing dependencies for the GCS. Namely the fact that we have made S3 and GCS a dependency of every module (see slack convo here). We should figure out how to consolidate all of the code around building these clients into a single module with tightly scope dependencies. That seems to interact in a weird way with our logging configuration which I don't fully grasp yet. I'm happy to look into do it in in my slack time, but it's not a small task and I don't think we should put it on the critical path for getting this scheduler project done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid to catch exception and throw IO ones explicitly, In temporal we will add a catch block anyway in the activities that uses it.

We definitely can. I'm trying to learn from our original ConfigPersistence. We did expose IOException there and then because it was such a pain to always be try / catching that exception in callers pretty much had ConfigRepository come into swallow the exceptions. If it meaningfully better to bubble up the exception here, we definitely can, but if it's just a style thing, then I think we save ourselves some headache by leaving it as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely can. I'm trying to learn from our original ConfigPersistence. We did expose IOException there and then because it was such a pain to always be try / catching that exception in callers pretty much had ConfigRepository come into swallow the exceptions. If it meaningfully better to bubble up the exception here, we definitely can, but if it's just a style thing, then I think we save ourselves some headache by leaving it as is.

All the temporal activity will have their method wrapped in a try {} catch (Exception e). The goal is to be able to make a workflow to go into a quarantine state in case there is no point to re-run the sync if the exception won't be solve by a re-run (like a NPE).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public interface CloudDocumentStoreClient {
public interface DocumentStoreClient {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed jared.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @benmoriceau offline and we agreed to hold off on the IOException for now because the only implementation of the interface that throws one is the DockerCompose version. The others wouldn't, so adding it to the iface would be mostly misleading.

Comment on lines 29 to 48
public StateStore s3(final S3Config config) {
return new StateStore(new S3CloudDocumentStoreClient(
new DefaultS3ClientFactory(config).get(),
config.getBucketName(),
STATE_ROOT));
}

public StateStore minio(final MinioConfig config) {
return new StateStore(new S3CloudDocumentStoreClient(
new MinioS3ClientFactory(config).get(),
config.getBucketName(),
STATE_ROOT));
}

public StateStore gcs(final GcsConfig config) {
return new StateStore(new GcsCloudDocumentStoreClient(
new DefaultGcsClientFactory(config).get(),
config.getBucketName(),
STATE_ROOT));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see!

The static pattern makes sense to me, since there isn't state to manage here.

I also have slight preference for a single class that handles all storage client creation instead of 3 separate small factories.

Copy link
Contributor

@benmoriceau benmoriceau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly nit comment and one suggestion for the proposed interface

/**
* Interface for treating cloud storage like a simple document store.
*/
public interface CloudDocumentStoreClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

/**
* Interface for treating cloud storage like a simple document store.
*/
public interface CloudDocumentStoreClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid to catch exception and throw IO ones explicitly, In temporal we will add a catch block anyway in the activities that uses it.

}

@Override
public Optional<String> read(final String id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About this prototype, why is it returning a String? I feel that we should return an Optional<T> and enforce the object is serialized with jackson (for example). I got a similar comments for the write. Is there any formart we want to use for the state than json. I mentioned Jackson because temporal is using it and the new state object are compatible with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty ambivalent. Pretty much if we ever want to store anything that's not json this might be a headache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that it will be the opposite. The serialization will depend on this wrapper only and be shared between all the application that uses it. Could it be solve by using an interface that will specify a serialization type and use the serialization type to serialize an object. I feel that if we switch to a binary serialization (like protobuff) the current interface won't feet our needs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How soon do you think we'll need more specific type casting?

If it's more than 6 months away, I favor leaving this as is now and adding this later. I'm not against it, but I value the simplicity/flexibility in the mean time to give us the most space to change things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an open (draft) PR that will need it. It is passing a JobRunConfig as an input to an activity. I was thinking about this usecase for this comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it makes sense that we would use String and then have a wrapped version that lets us do Json storage with Jackson serialization.


public GcsCloudDocumentStoreClient(final Storage gcsClient, final String bucketName, final Path root) {
this.gcsClient = gcsClient;
this.bucketName = bucketName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create the bucket if it doesn't exist? That will help with the first run of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. good point. yes i can add that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually @jrhizor & @davinchia how did you think about this for CloudLogs? It looks like it assumes that the buckets do exist (and I assume fails if they don't).

Copy link
Contributor

@davinchia davinchia Dec 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap. I don't think we should have this create buckets. I think it's simpler if we have the expectation that configuration passed into the cluster is expected to be correct.

This means:

  1. the GCP creds mounted on pods have reduced permissions, which is good
  2. any one operating Airbyte is forced to properly managed their buckets, which is also good.

for these reasons, I don't think this adds that much value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @davinchia here 100%

}
}

task cloudStorageIntegrationTest(type: Test) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, we can update the gradle.yaml file for in the .github folder to run those test in github action as well (similar to the integration tests)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so these tests require quite a big of environment setup (see: cloud_storage_logging_test.sh). is that compatible with the approach you are describing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can make sure that it only run on a release build or on a scheduled build using the same condition that has been use for the slow integration test or move those test as slow integration test and make the environment setup as a part of the slow integration test setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the benefit to doing that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure that the test is run before releasing without having too much impact on the build time for regular builds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understand. the tricky thing here is that these tests function as unit tests today, since testing against actual buckets is the recommended thing to do. if we move this to master/release only tests, we'll lose testing coverage here.

was there a specific time unit you want to meet? or was this suggestion opportunistic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like there are some open questions here. i don't think this thread should block merging as it is leveraging an existing pattern.


test {
useJUnitPlatform {
excludeTags 'log4j2-config', 'cloud-storage'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The excluded tags can go into the root gradle. There are already some tags being excluded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should everything in airbyte-config/models/build.gradle be moved as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes so it is defined in a single place for the exclusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@cgardens cgardens temporarily deployed to more-secrets December 8, 2021 21:21 Inactive
/**
* Interface for treating cloud storage like a simple document store.
*/
public interface CloudDocumentStoreClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public interface CloudDocumentStoreClient {
public interface DocumentStoreClient {

}

@Override
public Optional<String> read(final String id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it makes sense that we would use String and then have a wrapped version that lets us do Json storage with Jackson serialization.


public GcsCloudDocumentStoreClient(final Storage gcsClient, final String bucketName, final Path root) {
this.gcsClient = gcsClient;
this.bucketName = bucketName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @davinchia here 100%

private final CloudDocumentStoreClient documentStoreClient;

public StateStore s3(final S3Config config) {
return new StateStore(new S3CloudDocumentStoreClient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like all of the helpers should be for the document store creation not the StateStore since they only vary at that level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't they also be static?

* @param activityRunId - id to associate state with
* @param state - state to persist
*/
void setState(final UUID activityRunId, final State state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change this in a different PR but ReplicationOutput is likely the first interface not just State.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline. will just call this a WorkerStore and it will use json. then the scheduler project can build on top

@cgardens cgardens requested a review from benmoriceau December 9, 2021 22:33
@cgardens cgardens temporarily deployed to more-secrets December 9, 2021 22:34 Inactive
@cgardens cgardens temporarily deployed to more-secrets December 10, 2021 16:48 Inactive
@cgardens cgardens merged commit 058bb2a into master Dec 10, 2021
@cgardens cgardens deleted the cgardens/worker-doc-store2 branch December 10, 2021 17:52
schlattk pushed a commit to schlattk/airbyte that referenced this pull request Jan 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/worker Related to worker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants