Skip to content

support for registering custom index clients, added new methods to object stores #2049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 7, 2020

Conversation

sandeepsukhani
Copy link
Contributor

What this PR does:
To avoid changes required for Loki storage getting into Cortex, this PR allows using custom index clients. It also adds new methods for object stores to work on objects(io.Reader) instead of just chunks.

The effort here is to avoid merging #1533 which is mostly required for Loki. For being able to reuse boltdb object client in Loki for flushing boltdb files to object stores, I have done some changes there to make some of the things public.

We can adopt a similar approach wherever Loki specific changes are required.

Signed-off-by: Sandeep Sukhani sandeep.d.sukhani@gmail.com

Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All in all, good job. I deeply reviewed the changes to S3/GCS/Azure to make sure we're not introducing breaking changes in the logic. I spot few, and I left other comments as well. Shouldn't be difficult to re-iterate on it.

Once addressed all comments, and before asking for a re-review, I would be glad if you could manually test the S3/GCS/Azure clients, because we don't really have functional tests on them.

bucketNames []string
S3 s3iface.S3API
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) {
func NewS3ObjectClient(cfg StorageConfig) (*S3ObjectClient, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we return a reference *S3ObjectClient but struct functions don't work on a reference (ie. func (a S3ObjectClient) Stop()). May make sense to switch to a reference there as well.


// Put object into the store
func (a S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "S3.Put", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3.Put > S3.PutObject

}

// List objects from the store
func (a S3ObjectClient) List(ctx context.Context, prefix string) (map[string]time.Time, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slice of type StorageObject struct looks more appropriate instead of map[string]time.Time because more extensible over the time. It's OK if you just store the Key string and ModifiedAt time.Time for now, but will be easier to add further metadata in the future, if necessary.

// List objects from the store
func (a S3ObjectClient) List(ctx context.Context, prefix string) (map[string]time.Time, error) {
objectKeysWithMtime := map[string]time.Time{}
prefixWithSep := prefix + "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of this. I think it's better that the caller ensures that the prefix ends with / mainly because you may actually iterate over a prefix not ending with /.


for i := range a.bucketNames {
err := instrument.CollectedRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
output, err := a.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{Bucket: &a.bucketNames[i], Prefix: &prefix})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would replace &a.bucketNames[i], &prefix with aws.String() (instead of &) just to make it more clear.

@@ -73,7 +75,7 @@ func (cfg *Config) Validate() error {
}

// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, customIndexClients map[string]IndexClientFactoryFunc) (chunk.Store, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of passing customIndexClients here. An alternative approach may be:

  1. Define a global variable customIndexClients
  2. Add a function RegisterIndexClient(name string, factory IndexClientFactoryFunc) which stores the index client to customIndexClients
  3. In NewIndexClient(), before switch name { we do check if customIndexClients contains it. If so, it will take precedence over the defaults.

return err
}

for i := range output.Contents {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could avoid referencing *output.Contents[i] below using for _, content := range output.Contents, right?


for i := range output.Contents {
storageObjects = append(storageObjects, chunk.StorageObject{
Key: strings.TrimPrefix(*output.Contents[i].Key, prefix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't trim the prefix here. The object key should be the complete key name (including the prefix). If the caller needs it without the prefix, it should be the caller responsability to remove it.

// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
for _, blobInfo := range listBlob.Segment.BlobItems {
storageObjects = append(storageObjects, chunk.StorageObject{
Key: strings.TrimPrefix(blobInfo.Name, prefix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up my previous comment, I would not trim here as well.

continue
}
storageObjects = append(storageObjects, chunk.StorageObject{
Key: strings.TrimPrefix(attr.Name, prefix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not trim here as well.

@@ -116,6 +124,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
if factory, isOK := customIndexClients[name]; isOK {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a convention calling it ok, so I would rename isOK to ok.

@@ -116,6 +124,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
if factory, isOK := customIndexClients[name]; isOK {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test on this. You will need a UnregisterIndexClient function to call it in a deferwithin the test, so that we cleanup the global state before the test exits.

return nil, err
// Open the database.
// Set Timeout to avoid obtaining file lock wait indefinitely.
db, err := OpenBoltdbFile(path.Join(b.cfg.Directory, name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the need of moving it to a dedicated function OpenBoltdbFile()? If it's required for an easier integration with Loki, could you explain how it will work, please?

if ok {
return db, nil
}
if _, err := os.Stat(path.Join(b.cfg.Directory, name)); err == nil || operation == DBOperationWrite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If err != nil, it's ignored. I also find this if statement quite unclear. I would suggest to improve it and add a comment to explain that if the DB is in read-only mode and the DB doesn't exist yet, then we should not create a new one. An example of an improved logic may be reverse the if logic:

if operation == DBOperationRead {
  if _, err := os.Stat(path.Join(b.cfg.Directory, name)); err != nil {
    return nil, <an error here>
  }
}

}

b.dbs[name] = db
return db, nil
return nil, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never return nil, nil. Either a DB or an error (an error could be something like "unsupported operation by the configured mode"). The rest of the code doesn't currently handles the case both the DB and thr error are nil, and I belive it's correct they don't handle it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for read operation it does check whether db is there or not for querying https://github.com/cortexproject/cortex/pull/2049/files/6cab326d3ae40257ce2a40578b9e2cbcc0afce8c#diff-f0b676dad4198715e462b2faa06cfaa8R195

I don't feel right throwing an error because db does not exist and we don't want to create it for a read operation because it would anyways return no results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel right throwing an error

My perspective: if a BoltDB client instance is not configured for read operations, no read operation should occur on it. If it occurs, it's an error, and we should return an error. This also doesn't force us to change all other functions logic to handle the case GetDB() returns nil, nil.

@sandeepsukhani
Copy link
Contributor Author

Tested GetObject/PutObject/List operation locally with all the object stores that support them, it works fine as expected.

…ject store

NewIndexClient accepts factory methods for creating custom index clients
added new methods to object stores to work on objects(io.Reader) instead of just chunks

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job @sandeepsukhani! Thanks for patiently addressing all my feedback.

I tripled checked the changes and they all LGTM. I tried to pay particular attention to backward compatibility and I can't see any unsafe change.

I left few nits, but they're minimal and you don't need me doing another review. Please ask me another review if you will do changes beyond the comments I've left 🙏

*local.BoltIndexClient
}

func NewBoltDBCustomIndexClient(cfg local.BoltDBConfig) (chunk.IndexClient, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to expose this. Can be newBoltDBCustomIndexClient.

*local.BoltIndexClient
}

func NewAnotherIndexClient(cfg local.BoltDBConfig) (chunk.IndexClient, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to expose this. Can be newAnotherIndexClient.

@@ -0,0 +1,249 @@
package azure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this file.

writer.ChunkSize = s.cfg.ChunkBufferSize

if _, err := io.Copy(writer, object); err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the io.Copy fails, writer.Close() is never called. Shouldn't be called? I'm not sure about it, so please investigate it.

If yes, then this is a bug we have in master too, but we can fix it now. Please remind that the error returned should be the io.Copy() one.

_, err := os.Stat(folderPath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's the same, I would return storageObjects, nil to make the intention more clear.

func TestBoltDBReload(t *testing.T) {
dirname, err := ioutil.TempDir(os.TempDir(), "boltdb")
require.NoError(t, err)

Copy link
Contributor

@pracucci pracucci Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add defer os.RemoveAll(dirname) instead of doing it at the end of the test (otherwise if the test breaks we don't cleanup).

Same comment applies to the calls to <bolt db>.Stop() in the tests you've added

Comment on lines +76 to +78
if err != nil {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace this with:

require.NoError(t, err)
defer os.RemoveAll(dirname)

return b.QueryDb(ctx, db, query, callback)
}

func (b *BoltIndexClient) QueryDb(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryDb > QueryDB

testValue = []byte("test-value")
)

func setupDb(t *testing.T, boltdbIndexClient *BoltIndexClient, dbname string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupDb > setupDB


boltdbIndexClient.reload()

require.Equal(t, 1, len(boltdbIndexClient.dbs), "There should be 1 boltdb open")
Copy link
Contributor

@pracucci pracucci Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we also check if boltdbIndexClient.GetDB(testDb1, DBOperationRead) returns ErrUnexistentBoltDB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean trying to GetDB(testDb1) and then require.Equal(t, ErrUnexistentBoltDB, err)

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Copy link
Contributor

@jtlisi jtlisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

However, I noticed the GetChunks, getChunks, PutChunks, and putChunks methods are basically identical for all of the object store backends (S3, GCS, Azure). I think we should consider refactoring.

For instance:

  • Rename ObjectClient to ChunkClient as is.
  • Create a new ObjectClient interface that contains the the PutObject, GetObject and List functions
  • Create a generic implementation of the ChunkClient that would be built on top of the ObjectClient

@sandeepsukhani and @pracucci WDYT?

@sandeepsukhani
Copy link
Contributor Author

LGTM

However, I noticed the GetChunks, getChunks, PutChunks, and putChunks methods are basically identical for all of the object store backends (S3, GCS, Azure). I think we should consider refactoring.

For instance:

  • Rename ObjectClient to ChunkClient as is.
  • Create a new ObjectClient interface that contains the the PutObject, GetObject and List functions
  • Create a generic implementation of the ChunkClient that would be built on top of the ObjectClient

@sandeepsukhani and @pracucci WDYT?

Sounds good to me but I would suggest merging this and I will do another PR for it since I need code from this PR for Delete Series feature that I am working on and would be blocked on it until this gets reviewed again and merged.

@pracucci what do you think?

@pracucci
Copy link
Contributor

pracucci commented Feb 7, 2020

Sounds good to me but I would suggest merging this and I will do another PR for it since I need code from this PR for Delete Series feature that I am working on and would be blocked on it until this gets reviewed again and merged.

@pracucci what do you think?

I also suggest to NOT do it in this PR. The refactoring proposed would further complicate the diff. From my perspective the duplicated code is not that much and, personally, I don't see the effort of the refactoring valuable. In any case I would suggest to not bundle it in this PR.

@pracucci
Copy link
Contributor

pracucci commented Feb 7, 2020

I also suggest to NOT do it in this PR. The refactoring proposed would further complicate the diff. From my perspective the duplicated code is not that much and, personally, I don't see the effort of the refactoring valuable. In any case I would suggest to not bundle it in this PR.

After a closer look, I still believe it's better to do it in a different PR, but actually there may be some value in refactoring.

I'm going to merge this PR, so that Sandeep can move on with Loki (this PR is necessary to allow Loki to extend the BoltDB index client and add the "archiver" support), while would be great if @sandeepsukhani considers the suggested refactoring for a not-urgent following PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants