-
Notifications
You must be signed in to change notification settings - Fork 820
support for registering custom index clients, added new methods to object stores #2049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support for registering custom index clients, added new methods to object stores #2049
Conversation
f8103c3
to
e89f4ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All in all, good job. I deeply reviewed the changes to S3/GCS/Azure to make sure we're not introducing breaking changes in the logic. I spot few, and I left other comments as well. Shouldn't be difficult to re-iterate on it.
Once addressed all comments, and before asking for a re-review, I would be glad if you could manually test the S3/GCS/Azure clients, because we don't really have functional tests on them.
pkg/chunk/aws/s3_storage_client.go
Outdated
bucketNames []string | ||
S3 s3iface.S3API | ||
} | ||
|
||
// NewS3ObjectClient makes a new S3-backed ObjectClient. | ||
func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { | ||
func NewS3ObjectClient(cfg StorageConfig) (*S3ObjectClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we return a reference *S3ObjectClient
but struct functions don't work on a reference (ie. func (a S3ObjectClient) Stop()
). May make sense to switch to a reference there as well.
pkg/chunk/aws/s3_storage_client.go
Outdated
|
||
// Put object into the store | ||
func (a S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { | ||
return instrument.CollectedRequest(ctx, "S3.Put", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3.Put
> S3.PutObject
pkg/chunk/aws/s3_storage_client.go
Outdated
} | ||
|
||
// List objects from the store | ||
func (a S3ObjectClient) List(ctx context.Context, prefix string) (map[string]time.Time, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A slice of type StorageObject struct
looks more appropriate instead of map[string]time.Time
because more extensible over the time. It's OK if you just store the Key string
and ModifiedAt time.Time
for now, but will be easier to add further metadata in the future, if necessary.
pkg/chunk/aws/s3_storage_client.go
Outdated
// List objects from the store | ||
func (a S3ObjectClient) List(ctx context.Context, prefix string) (map[string]time.Time, error) { | ||
objectKeysWithMtime := map[string]time.Time{} | ||
prefixWithSep := prefix + "/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big fan of this. I think it's better that the caller ensures that the prefix ends with /
mainly because you may actually iterate over a prefix not ending with /
.
pkg/chunk/aws/s3_storage_client.go
Outdated
|
||
for i := range a.bucketNames { | ||
err := instrument.CollectedRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { | ||
output, err := a.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{Bucket: &a.bucketNames[i], Prefix: &prefix}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would replace &a.bucketNames[i]
, &prefix
with aws.String()
(instead of &
) just to make it more clear.
pkg/chunk/storage/factory.go
Outdated
@@ -73,7 +75,7 @@ func (cfg *Config) Validate() error { | |||
} | |||
|
|||
// NewStore makes the storage clients based on the configuration. | |||
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) { | |||
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, customIndexClients map[string]IndexClientFactoryFunc) (chunk.Store, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of passing customIndexClients
here. An alternative approach may be:
- Define a global variable
customIndexClients
- Add a function
RegisterIndexClient(name string, factory IndexClientFactoryFunc)
which stores the index client tocustomIndexClients
- In
NewIndexClient()
, beforeswitch name {
we do check ifcustomIndexClients
contains it. If so, it will take precedence over the defaults.
pkg/chunk/aws/s3_storage_client.go
Outdated
return err | ||
} | ||
|
||
for i := range output.Contents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could avoid referencing *output.Contents[i]
below using for _, content := range output.Contents
, right?
pkg/chunk/aws/s3_storage_client.go
Outdated
|
||
for i := range output.Contents { | ||
storageObjects = append(storageObjects, chunk.StorageObject{ | ||
Key: strings.TrimPrefix(*output.Contents[i].Key, prefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't trim the prefix here. The object key should be the complete key name (including the prefix). If the caller needs it without the prefix, it should be the caller responsability to remove it.
// Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute) | ||
for _, blobInfo := range listBlob.Segment.BlobItems { | ||
storageObjects = append(storageObjects, chunk.StorageObject{ | ||
Key: strings.TrimPrefix(blobInfo.Name, prefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up my previous comment, I would not trim here as well.
pkg/chunk/gcp/gcs_object_client.go
Outdated
continue | ||
} | ||
storageObjects = append(storageObjects, chunk.StorageObject{ | ||
Key: strings.TrimPrefix(attr.Name, prefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not trim here as well.
pkg/chunk/storage/factory.go
Outdated
@@ -116,6 +124,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf | |||
|
|||
// NewIndexClient makes a new index client of the desired type. | |||
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { | |||
if factory, isOK := customIndexClients[name]; isOK { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a convention calling it ok
, so I would rename isOK
to ok
.
pkg/chunk/storage/factory.go
Outdated
@@ -116,6 +124,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf | |||
|
|||
// NewIndexClient makes a new index client of the desired type. | |||
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { | |||
if factory, isOK := customIndexClients[name]; isOK { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a unit test on this. You will need a UnregisterIndexClient
function to call it in a defer
within the test, so that we cleanup the global state before the test exits.
return nil, err | ||
// Open the database. | ||
// Set Timeout to avoid obtaining file lock wait indefinitely. | ||
db, err := OpenBoltdbFile(path.Join(b.cfg.Directory, name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain the need of moving it to a dedicated function OpenBoltdbFile()
? If it's required for an easier integration with Loki, could you explain how it will work, please?
if ok { | ||
return db, nil | ||
} | ||
if _, err := os.Stat(path.Join(b.cfg.Directory, name)); err == nil || operation == DBOperationWrite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If err != nil
, it's ignored. I also find this if
statement quite unclear. I would suggest to improve it and add a comment to explain that if the DB is in read-only mode and the DB doesn't exist yet, then we should not create a new one. An example of an improved logic may be reverse the if
logic:
if operation == DBOperationRead {
if _, err := os.Stat(path.Join(b.cfg.Directory, name)); err != nil {
return nil, <an error here>
}
}
} | ||
|
||
b.dbs[name] = db | ||
return db, nil | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never return nil, nil
. Either a DB or an error (an error could be something like "unsupported operation by the configured mode"). The rest of the code doesn't currently handles the case both the DB and thr error are nil
, and I belive it's correct they don't handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for read operation it does check whether db is there or not for querying https://github.com/cortexproject/cortex/pull/2049/files/6cab326d3ae40257ce2a40578b9e2cbcc0afce8c#diff-f0b676dad4198715e462b2faa06cfaa8R195
I don't feel right throwing an error because db does not exist and we don't want to create it for a read operation because it would anyways return no results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel right throwing an error
My perspective: if a BoltDB client instance is not configured for read operations, no read operation should occur on it. If it occurs, it's an error, and we should return an error. This also doesn't force us to change all other functions logic to handle the case GetDB()
returns nil, nil
.
ba676fc
to
6398766
Compare
Tested GetObject/PutObject/List operation locally with all the object stores that support them, it works fine as expected. |
…ject store NewIndexClient accepts factory methods for creating custom index clients added new methods to object stores to work on objects(io.Reader) instead of just chunks Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
6398766
to
273032a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job @sandeepsukhani! Thanks for patiently addressing all my feedback.
I tripled checked the changes and they all LGTM. I tried to pay particular attention to backward compatibility and I can't see any unsafe change.
I left few nits, but they're minimal and you don't need me doing another review. Please ask me another review if you will do changes beyond the comments I've left 🙏
pkg/chunk/storage/factory_test.go
Outdated
*local.BoltIndexClient | ||
} | ||
|
||
func NewBoltDBCustomIndexClient(cfg local.BoltDBConfig) (chunk.IndexClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to expose this. Can be newBoltDBCustomIndexClient
.
pkg/chunk/storage/factory_test.go
Outdated
*local.BoltIndexClient | ||
} | ||
|
||
func NewAnotherIndexClient(cfg local.BoltDBConfig) (chunk.IndexClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to expose this. Can be newAnotherIndexClient
.
@@ -0,0 +1,249 @@ | |||
package azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this file.
writer.ChunkSize = s.cfg.ChunkBufferSize | ||
|
||
if _, err := io.Copy(writer, object); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the io.Copy
fails, writer.Close()
is never called. Shouldn't be called? I'm not sure about it, so please investigate it.
If yes, then this is a bug we have in master
too, but we can fix it now. Please remind that the error returned should be the io.Copy()
one.
pkg/chunk/local/fs_object_client.go
Outdated
_, err := os.Stat(folderPath) | ||
if err != nil { | ||
if os.IsNotExist(err) { | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if it's the same, I would return storageObjects, nil
to make the intention more clear.
func TestBoltDBReload(t *testing.T) { | ||
dirname, err := ioutil.TempDir(os.TempDir(), "boltdb") | ||
require.NoError(t, err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add defer os.RemoveAll(dirname)
instead of doing it at the end of the test (otherwise if the test breaks we don't cleanup).
Same comment applies to the calls to <bolt db>.Stop()
in the tests you've added
if err != nil { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace this with:
require.NoError(t, err)
defer os.RemoveAll(dirname)
return b.QueryDb(ctx, db, query, callback) | ||
} | ||
|
||
func (b *BoltIndexClient) QueryDb(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryDb
> QueryDB
testValue = []byte("test-value") | ||
) | ||
|
||
func setupDb(t *testing.T, boltdbIndexClient *BoltIndexClient, dbname string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setupDb
> setupDB
|
||
boltdbIndexClient.reload() | ||
|
||
require.Equal(t, 1, len(boltdbIndexClient.dbs), "There should be 1 boltdb open") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May we also check if boltdbIndexClient.GetDB(testDb1, DBOperationRead)
returns ErrUnexistentBoltDB
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry if I didn't understand it, is this what you are looking for?
https://github.com/cortexproject/cortex/pull/2049/files/273032a393eb1a6cc815e5387a1bd2259f494dc4#diff-4f420894f40d4e2e1a9b491716cc37dfR96
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean trying to GetDB(testDb1)
and then require.Equal(t, ErrUnexistentBoltDB, err)
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
However, I noticed the GetChunks
, getChunks
, PutChunks
, and putChunks
methods are basically identical for all of the object store backends (S3, GCS, Azure). I think we should consider refactoring.
For instance:
- Rename
ObjectClient
toChunkClient
as is. - Create a new
ObjectClient
interface that contains the thePutObject
,GetObject
andList
functions - Create a generic implementation of the
ChunkClient
that would be built on top of theObjectClient
@sandeepsukhani and @pracucci WDYT?
Sounds good to me but I would suggest merging this and I will do another PR for it since I need code from this PR for Delete Series feature that I am working on and would be blocked on it until this gets reviewed again and merged. @pracucci what do you think? |
I also suggest to NOT do it in this PR. The refactoring proposed would further complicate the diff. From my perspective the duplicated code is not that much and, personally, I don't see the effort of the refactoring valuable. In any case I would suggest to not bundle it in this PR. |
After a closer look, I still believe it's better to do it in a different PR, but actually there may be some value in refactoring. I'm going to merge this PR, so that Sandeep can move on with Loki (this PR is necessary to allow Loki to extend the BoltDB index client and add the "archiver" support), while would be great if @sandeepsukhani considers the suggested refactoring for a not-urgent following PR. |
What this PR does:
To avoid changes required for Loki storage getting into Cortex, this PR allows using custom index clients. It also adds new methods for object stores to work on objects(io.Reader) instead of just chunks.
The effort here is to avoid merging #1533 which is mostly required for Loki. For being able to reuse boltdb object client in Loki for flushing boltdb files to object stores, I have done some changes there to make some of the things public.
We can adopt a similar approach wherever Loki specific changes are required.
Signed-off-by: Sandeep Sukhani sandeep.d.sukhani@gmail.com