-
Notifications
You must be signed in to change notification settings - Fork 820
thanosconvert: convert block meta from Thanos to Cortex #3770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
047a8f2
to
d33bf34
Compare
d33bf34
to
a8b8f2f
Compare
a8b8f2f
to
f55d19c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic job! We wanted this since a long time, thanks! I left few comments here and there, but mostly nits that should be very quick to address.
Other than the comments:
- Could you add it to the list of experimental features listed in
docs/configuration/v1-guarantees.md
, please? - Could you mention the tool in
docs/blocks-storage/migrate-storage-from-thanos-and-prometheus.md
? There's a sentence like "no tool is provided to migrate TSDB blocks from Thanos" while now there is 😉
cmd/thanosconvert/main.go
Outdated
iterCtx := context.Background() | ||
results, err := converter.Run(iterCtx) | ||
if err != nil { | ||
level.Error(util.Logger).Log("msg", "iterate blocks", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Error(util.Logger).Log("msg", "iterate blocks", "err", err) | |
level.Error(util.Logger).Log("msg", "error while iterating blocks", "err", err) |
tools/thanosconvert/thanosconvert.go
Outdated
type Results map[string]PerUserResults | ||
|
||
// NewThanosBlockConverter creates a ThanosBlockConverter | ||
func NewThanosBlockConverter(ctx context.Context, cfg bucket.Config, dryRun bool, logger log.Logger, reg prometheus.Registerer) (*ThanosBlockConverter, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove reg prometheus.Registerer
given this tool doesn't export metrics anyway, and just pass nil
as registered to bucket.NewClient()
(when nil
metrics registration will be skipped).
tools/thanosconvert/thanosconvert.go
Outdated
var users []string | ||
err := c.bkt.Iter(ctx, "", func(o string) error { | ||
users = append(users, strings.TrimSuffix(o, "/")) | ||
return nil | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to reuse the scanner we already have:
scanner := cortex_tsdb.NewUsersScanner(c.bkt, cortex_tsdb.AllUsers)
users, _, err := scanner.ScanUsers(ctx)
tools/thanosconvert/thanosconvert.go
Outdated
func (c ThanosBlockConverter) convertUser(ctx context.Context, user string) (PerUserResults, error) { | ||
results := PerUserResults{} | ||
|
||
err := c.bkt.Iter(ctx, user, func(o string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would wrap c.bkt
with bucket.NewUserBucketClient()
. This will mask you the first object prefix segment (which is the <tenant ID>/
).
tools/thanosconvert/thanosconvert.go
Outdated
err := c.bkt.Iter(ctx, user, func(o string) error { | ||
// get block ID from path | ||
|
||
parts := strings.Split(o, "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you use the UserBuckerClient
, this should just be a matter of:
o = strings.TrimSuffix(o, "/")
blockID, err := ulid.Parse(o)
If the parse fail, just skip it (no error) because it means it's not a block. We don't have just blocks in bucket.
} | ||
} | ||
|
||
func CortexMeta(user string) metadata.Meta { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to export this function.
} | ||
} | ||
|
||
func ThanosMeta() metadata.Meta { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to export this function.
} | ||
} | ||
|
||
func GetLogger() log.Logger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to export this function.
return logger | ||
} | ||
|
||
func MockBucket(data fakeBucket) *bucket.ClientMock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to export this function.
name: "bucket fully converted is a noop", | ||
bucketData: fakeBucket{ | ||
"user1": map[string]metadata.Meta{ | ||
"block1": CortexMeta("user1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you start using ulid.Parse()
these tests will fail because "block1"
is not a valid block ID. You could just define at the beginning of the test function something like the following, and then use such variables across the test cases:
block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)
@pracucci thanks for the detailed review! I think I've addressed all your comments - I've added a new commit to make it easier to review the changes, but can squash it down once you're happy. I'll make the docs changes today. |
7e6f56d
to
85057b3
Compare
8659187
to
e17a6fb
Compare
thanosconvert is a small tool to iterate over a bucket and convert block-level metadata (in meta.json) so that it is readable by Cortex. This tool assumes that users have already copied Thanos blocks over to a destination bucket, with blocks organised by user at the top level. Signed-off-by: Miles Bryant <milesbryant@monzo.com>
* Remove prometheus.DefaultRegisterer as we don't export metrics * Use cortex_tsdb.UsersScanner instead of custom logic * Use UserBucketClient instead of custom logic * Use ulid.Parse to make sure we're in a valid block * Generate ulids in tests to pass validation * Use metadata.Read and Write instead of decoding/encoding JSON ourselves * Add Metadata.BlockMeta.Version in tests to pass validation in Read * Use cortex_tsdb.TenantIDExternalLabel instead of hardcoded string Signed-off-by: Miles Bryant <milesbryant@monzo.com>
Signed-off-by: Miles Bryant <milesbryant@monzo.com>
Signed-off-by: Miles Bryant <milesbryant@monzo.com>
e17a6fb
to
97e872b
Compare
👋 @pracucci I've updated the docs and added a CHANGELOG entry. Is there anything else I need to do? |
0d46028
to
6c1246e
Compare
Signed-off-by: Miles Bryant <milesbryant@monzo.com>
6c1246e
to
20ee863
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job, thanks! I left a couple of nits. Can you confirm you manually tested it after changes? I haven't got the time to manually test it.
@pstibrany could you do the 2nd review, please?
tools/thanosconvert/thanosconvert.go
Outdated
metaKey := fmt.Sprintf("%s/%s/meta.json", user, blockID) | ||
r, err := c.bkt.Get(ctx, metaKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the userBucketClient
please. It means the path to meta.json
will be:
metaKey := fmt.Sprintf("%s/meta.json", blockID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After using userBucketClient
, we can also use block.DownloadMeta()
which will download and parse the meta.json file (it doesn't check for Cortex orgid label, so that's fine), to simplify the code here.
tools/thanosconvert/thanosconvert.go
Outdated
var changesRequired []string | ||
|
||
if meta.Thanos.Labels == nil { | ||
// TODO: no "thanos" entry could mean this block is corrupt - should we bail out instead? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not bail out. If we don't require the Thanos entry, the tool should work with Prometheus blocks too, which I believe is a plus for the community.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this is useful tool. I left bunch of comments suggestions if you feel like working on this code bit more, but they are not blocking... the tool already looks useful as-is.
docs/blocks-storage/migrate-storage-from-thanos-and-prometheus.md
Outdated
Show resolved
Hide resolved
docs/blocks-storage/migrate-storage-from-thanos-and-prometheus.md
Outdated
Show resolved
Hide resolved
tools/thanosconvert/thanosconvert.go
Outdated
|
||
func NewPerUserResults() PerUserResults { | ||
return PerUserResults{ | ||
FailedBlocks: []string{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this creates 0-sized slice, which is unnecessary. As it is now, PerUserResults{}
is perfectly fine default value, and doesn't even need "New" function -- which isn't used anywhere.
(Furthermore, since methods are defined on *PerUserResults
, this method should return pointer. But I'd suggest to remove it completely.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out I wasn't even using this constructor anyway 😄
tools/thanosconvert/thanosconvert.go
Outdated
userBucketClient := bucket.NewUserBucketClient(user, c.bkt) | ||
err := userBucketClient.Iter(ctx, "", func(o string) error { | ||
// get block ID from path | ||
o = strings.TrimSuffix(o, "/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can shorten the code here by using id, ok := block.IsBlockDir(o)
.
tools/thanosconvert/thanosconvert.go
Outdated
metaKey := fmt.Sprintf("%s/%s/meta.json", user, blockID) | ||
r, err := c.bkt.Get(ctx, metaKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After using userBucketClient
, we can also use block.DownloadMeta()
which will download and parse the meta.json file (it doesn't check for Cortex orgid label, so that's fine), to simplify the code here.
tools/thanosconvert/thanosconvert.go
Outdated
|
||
if len(changesRequired) > 0 { | ||
if c.dryRun { | ||
level.Debug(c.logger).Log("msg", "Block requires changes, not uploading due to dry run", "block", blockID, "changes_required", strings.Join(changesRequired, ",")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to repeat "not uploading due to dry run" message for each block? Perhaps just short "(dry-run)" would do.
logfmt, loglvl := logging.Format{}, logging.Level{} | ||
logfmt.RegisterFlags(flag.CommandLine) | ||
loglvl.RegisterFlags(flag.CommandLine) | ||
flag.StringVar(&configFilename, "config", "", "Path to bucket config YAML") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd suggest supporting command line options too (or perhaps as the only way of configuring). Bucket config needs just a couple of options.
cmd/thanosconvert/main.go
Outdated
|
||
var ( | ||
configFilename string | ||
dryRun bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for global variables, move to main() method.
e858b9c
to
c51e10d
Compare
* Add command line options for bucket configuration * Use github.com/thanos-io/thanos/pkg/block to parse block directory and fetch metadata instead of custom code * Tweak some log lines Co-authored-by: Peter Štibraný <pstibrany@gmail.com> Signed-off-by: Miles Bryant <milesbryant@monzo.com>
c51e10d
to
1d9bfe3
Compare
Thanks for the review - I figured I'd implement a few of your suggestions whilst the PR is still open 🙏 |
Thanks! Just to confirm, I've manually tested this after the latest batch of changes (again, only with S3) |
Looks like an unrelated test failed:
I can report an issue for this if you think it's a flake - in the meantime how do I rerun CI? |
There is "Rerun all jobs" button in top-right corner of the workflow run. I've restarted it now, just in case you don't have permissions and don't see that button. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my feedback. I've left few more (non-blocking) comments to take a look at, if you want.
tools/thanosconvert/thanosconvert.go
Outdated
|
||
if len(changesRequired) > 0 { | ||
if c.dryRun { | ||
level.Info(c.logger).Log("msg", "Block requires changes (dry-run)", "block", blockID.String(), "changes_required", strings.Join(changesRequired, ",")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to add "user" into all log messages here. (Or use ulogger := log.With(c.logger, "user", user)
use wrapped ulogger only in entire method.)
tools/thanosconvert/thanosconvert.go
Outdated
func convertMetadata(meta metadata.Meta, expectedUser string) (metadata.Meta, []string) { | ||
var changesRequired []string | ||
|
||
if meta.Thanos.Labels == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this check or creating of empty meta.Thanos.Labels
, since it will be replaced anyway. Reading from nil
map, or deleting from nil
map will work as expected. (I'd suggest adding unit test for that)
cmd/thanosconvert/main.go
Outdated
logger, err := log.NewPrometheusLogger(loglvl, logfmt) | ||
if err != nil { | ||
fmt.Printf("failed to create logger: %v\n", err) | ||
flag.Usage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Printing usage hides the error message. I'd suggest removing flag.Usage() calls after print errors.
cmd/thanosconvert/main.go
Outdated
if configFilename != "" { | ||
buf, err := ioutil.ReadFile(configFilename) | ||
if err != nil { | ||
level.Error(logger).Log("msg", "failed to load config file", "err", err, "filename", configFilename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This methods uses three different ways to print errors: logger, fmt.Printf
(stdout) and fmt.Fprintf(flag.Commandline.Output())
which uses stderr. I'd suggest to print errors followed by os.Exit(1) to stderr.
os.Exit(1) | ||
} | ||
|
||
fmt.Println("Results:") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use logger for logging the progress in Run
method. Perhaps we should use logger for logging the result too?
cmd/thanosconvert/main.go
Outdated
|
||
iterCtx := context.Background() | ||
results, err := converter.Run(iterCtx) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be some results already when this error happens. Should this code simply log results first, and error (if any) as last?
Thanks @pstibrany - I've addressed some of these 🙏 I probably won't be able to spend much more time on this, so let me know if there's anything blocking the merge you'd like me to address |
* Add user field to log lines during bucket iteration * Remove flag.Usage from errors * Remove logging from main function; Fprintf to stderr instead Signed-off-by: Miles Bryant <milesbryant@monzo.com>
f9f2348
to
ad5bc30
Compare
Thanks a lot @milesbxf for your hard work!
Sure. It's a great tool, let's ship it! 🚀 |
What this PR does:
thanosconvert is a small tool to iterate over a bucket and convert block-level metadata (in meta.json) so that it is readable by Cortex. This tool assumes that users have already copied Thanos blocks over to a destination bucket, with blocks organised by user at the top level.
This is a relatively simple serial implementation - it's pretty quick to run so making it concurrent is probably unnecessary, at least for a v1. I've only tested this with S3, but since it uses
github.com/thanos-io/thanos/pkg/objstore
it should work with all the other supported block stores. It reuses the samepkg/storage/bucket.Config
struct for bucket configuration so should be easy to configure for users who already have the store-gateway/querier running.It's idempotent so is safe to run multiple times against the same bucket, and leaves Cortex blocks untouched. It will log any errors it encounters but keeps going.
I'm happy that this is mostly complete, but there's a few things left to do - I'd love some feedback before I finish it off!
Which issue(s) this PR fixes:
Fixes #2920
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]