Skip to content

thanosconvert: convert block meta from Thanos to Cortex #3770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 10, 2021

Conversation

milesbxf
Copy link
Contributor

@milesbxf milesbxf commented Feb 1, 2021

What this PR does:

thanosconvert is a small tool to iterate over a bucket and convert block-level metadata (in meta.json) so that it is readable by Cortex. This tool assumes that users have already copied Thanos blocks over to a destination bucket, with blocks organised by user at the top level.

This is a relatively simple serial implementation - it's pretty quick to run so making it concurrent is probably unnecessary, at least for a v1. I've only tested this with S3, but since it uses github.com/thanos-io/thanos/pkg/objstore it should work with all the other supported block stores. It reuses the same pkg/storage/bucket.Config struct for bucket configuration so should be easy to configure for users who already have the store-gateway/querier running.

It's idempotent so is safe to run multiple times against the same bucket, and leaves Cortex blocks untouched. It will log any errors it encounters but keeps going.

I'm happy that this is mostly complete, but there's a few things left to do - I'd love some feedback before I finish it off!

Which issue(s) this PR fixes:
Fixes #2920

Checklist

  • Add CLI usage message
  • Clean up, comment public API methods
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

@milesbxf milesbxf changed the title thanosconvert: add tool to convert Thanos meta.json thanosconvert: convert block meta from Thanos to Cortex Feb 1, 2021
@milesbxf milesbxf marked this pull request as ready for review February 1, 2021 15:17
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic job! We wanted this since a long time, thanks! I left few comments here and there, but mostly nits that should be very quick to address.

Other than the comments:

  • Could you add it to the list of experimental features listed in docs/configuration/v1-guarantees.md, please?
  • Could you mention the tool in docs/blocks-storage/migrate-storage-from-thanos-and-prometheus.md? There's a sentence like "no tool is provided to migrate TSDB blocks from Thanos" while now there is 😉

iterCtx := context.Background()
results, err := converter.Run(iterCtx)
if err != nil {
level.Error(util.Logger).Log("msg", "iterate blocks", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Error(util.Logger).Log("msg", "iterate blocks", "err", err)
level.Error(util.Logger).Log("msg", "error while iterating blocks", "err", err)

type Results map[string]PerUserResults

// NewThanosBlockConverter creates a ThanosBlockConverter
func NewThanosBlockConverter(ctx context.Context, cfg bucket.Config, dryRun bool, logger log.Logger, reg prometheus.Registerer) (*ThanosBlockConverter, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove reg prometheus.Registerer given this tool doesn't export metrics anyway, and just pass nil as registered to bucket.NewClient() (when nil metrics registration will be skipped).

Comment on lines 71 to 75
var users []string
err := c.bkt.Iter(ctx, "", func(o string) error {
users = append(users, strings.TrimSuffix(o, "/"))
return nil
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to reuse the scanner we already have:

scanner := cortex_tsdb.NewUsersScanner(c.bkt, cortex_tsdb.AllUsers)
users, _, err := scanner.ScanUsers(ctx)

func (c ThanosBlockConverter) convertUser(ctx context.Context, user string) (PerUserResults, error) {
results := PerUserResults{}

err := c.bkt.Iter(ctx, user, func(o string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would wrap c.bkt with bucket.NewUserBucketClient(). This will mask you the first object prefix segment (which is the <tenant ID>/).

err := c.bkt.Iter(ctx, user, func(o string) error {
// get block ID from path

parts := strings.Split(o, "/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you use the UserBuckerClient, this should just be a matter of:

o = strings.TrimSuffix(o, "/")
blockID, err := ulid.Parse(o)

If the parse fail, just skip it (no error) because it means it's not a block. We don't have just blocks in bucket.

}
}

func CortexMeta(user string) metadata.Meta {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to export this function.

}
}

func ThanosMeta() metadata.Meta {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to export this function.

}
}

func GetLogger() log.Logger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to export this function.

return logger
}

func MockBucket(data fakeBucket) *bucket.ClientMock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to export this function.

name: "bucket fully converted is a noop",
bucketData: fakeBucket{
"user1": map[string]metadata.Meta{
"block1": CortexMeta("user1"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you start using ulid.Parse() these tests will fail because "block1" is not a valid block ID. You could just define at the beginning of the test function something like the following, and then use such variables across the test cases:

block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)

@milesbxf
Copy link
Contributor Author

milesbxf commented Feb 2, 2021

Fantastic job! We wanted this since a long time, thanks! I left few comments here and there, but mostly nits that should be very quick to address.

Other than the comments:

  • Could you add it to the list of experimental features listed in docs/configuration/v1-guarantees.md, please?
  • Could you mention the tool in docs/blocks-storage/migrate-storage-from-thanos-and-prometheus.md? There's a sentence like "no tool is provided to migrate TSDB blocks from Thanos" while now there is 😉

@pracucci thanks for the detailed review! I think I've addressed all your comments - I've added a new commit to make it easier to review the changes, but can squash it down once you're happy. I'll make the docs changes today.

@milesbxf milesbxf requested a review from pracucci February 2, 2021 11:00
@milesbxf milesbxf force-pushed the thanosconvert branch 3 times, most recently from 8659187 to e17a6fb Compare February 4, 2021 10:03
thanosconvert is a small tool to iterate over a bucket and convert
block-level metadata (in meta.json) so that it is readable by Cortex.
This tool assumes that users have already copied Thanos blocks over to a
destination bucket, with blocks organised by user at the top level.

Signed-off-by: Miles Bryant <milesbryant@monzo.com>
* Remove prometheus.DefaultRegisterer as we don't export metrics
* Use cortex_tsdb.UsersScanner instead of custom logic
* Use UserBucketClient instead of custom logic
* Use ulid.Parse to make sure we're in a valid block
* Generate ulids in tests to pass validation
* Use metadata.Read and Write instead of decoding/encoding JSON
ourselves
* Add Metadata.BlockMeta.Version in tests to pass validation in Read
* Use cortex_tsdb.TenantIDExternalLabel instead of hardcoded string

Signed-off-by: Miles Bryant <milesbryant@monzo.com>
Signed-off-by: Miles Bryant <milesbryant@monzo.com>
Signed-off-by: Miles Bryant <milesbryant@monzo.com>
@milesbxf
Copy link
Contributor Author

milesbxf commented Feb 4, 2021

👋 @pracucci I've updated the docs and added a CHANGELOG entry. Is there anything else I need to do?

Signed-off-by: Miles Bryant <milesbryant@monzo.com>
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, thanks! I left a couple of nits. Can you confirm you manually tested it after changes? I haven't got the time to manually test it.

@pstibrany could you do the 2nd review, please?

Comment on lines 105 to 106
metaKey := fmt.Sprintf("%s/%s/meta.json", user, blockID)
r, err := c.bkt.Get(ctx, metaKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the userBucketClient please. It means the path to meta.json will be:
metaKey := fmt.Sprintf("%s/meta.json", blockID)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After using userBucketClient, we can also use block.DownloadMeta() which will download and parse the meta.json file (it doesn't check for Cortex orgid label, so that's fine), to simplify the code here.

var changesRequired []string

if meta.Thanos.Labels == nil {
// TODO: no "thanos" entry could mean this block is corrupt - should we bail out instead?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not bail out. If we don't require the Thanos entry, the tool should work with Prometheus blocks too, which I believe is a plus for the community.

@pracucci pracucci requested a review from pstibrany February 5, 2021 10:50
Copy link
Contributor

@pstibrany pstibrany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this is useful tool. I left bunch of comments suggestions if you feel like working on this code bit more, but they are not blocking... the tool already looks useful as-is.


func NewPerUserResults() PerUserResults {
return PerUserResults{
FailedBlocks: []string{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this creates 0-sized slice, which is unnecessary. As it is now, PerUserResults{} is perfectly fine default value, and doesn't even need "New" function -- which isn't used anywhere.

(Furthermore, since methods are defined on *PerUserResults, this method should return pointer. But I'd suggest to remove it completely.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out I wasn't even using this constructor anyway 😄

userBucketClient := bucket.NewUserBucketClient(user, c.bkt)
err := userBucketClient.Iter(ctx, "", func(o string) error {
// get block ID from path
o = strings.TrimSuffix(o, "/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can shorten the code here by using id, ok := block.IsBlockDir(o).

Comment on lines 105 to 106
metaKey := fmt.Sprintf("%s/%s/meta.json", user, blockID)
r, err := c.bkt.Get(ctx, metaKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After using userBucketClient, we can also use block.DownloadMeta() which will download and parse the meta.json file (it doesn't check for Cortex orgid label, so that's fine), to simplify the code here.


if len(changesRequired) > 0 {
if c.dryRun {
level.Debug(c.logger).Log("msg", "Block requires changes, not uploading due to dry run", "block", blockID, "changes_required", strings.Join(changesRequired, ","))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to repeat "not uploading due to dry run" message for each block? Perhaps just short "(dry-run)" would do.

logfmt, loglvl := logging.Format{}, logging.Level{}
logfmt.RegisterFlags(flag.CommandLine)
loglvl.RegisterFlags(flag.CommandLine)
flag.StringVar(&configFilename, "config", "", "Path to bucket config YAML")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd suggest supporting command line options too (or perhaps as the only way of configuring). Bucket config needs just a couple of options.


var (
configFilename string
dryRun bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need for global variables, move to main() method.

* Add command line options for bucket configuration
* Use github.com/thanos-io/thanos/pkg/block to parse block directory and
fetch metadata instead of custom code
* Tweak some log lines

Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Miles Bryant <milesbryant@monzo.com>
@milesbxf
Copy link
Contributor Author

milesbxf commented Feb 5, 2021

Thank you, this is useful tool. I left bunch of comments suggestions if you feel like working on this code bit more, but they are not blocking... the tool already looks useful as-is.

Thanks for the review - I figured I'd implement a few of your suggestions whilst the PR is still open 🙏

@milesbxf
Copy link
Contributor Author

milesbxf commented Feb 5, 2021

Good job, thanks! I left a couple of nits. Can you confirm you manually tested it after changes? I haven't got the time to manually test it.

Thanks! Just to confirm, I've manually tested this after the latest batch of changes (again, only with S3)

@milesbxf
Copy link
Contributor Author

milesbxf commented Feb 5, 2021

Looks like an unrelated test failed:

--- FAIL: TestAlertmanager_ServeHTTP (0.24s)
    multitenant_test.go:90: 
        	Error Trace:	multitenant_test.go:90
        	            				testing.go:846
        	            				testing.go:876
        	            				testing.go:1048
        	Error:      	Received unexpected error:
        	            	unlinkat /tmp/alertmanager319454365: directory not empty
        	Test:       	TestAlertmanager_ServeHTTP
FAIL
coverage: 65.6% of statements in github.com/cortexproject/cortex/pkg/alertmanager
FAIL	github.com/cortexproject/cortex/pkg/alertmanager	32.268s
FAIL

I can report an issue for this if you think it's a flake - in the meantime how do I rerun CI?

@pstibrany
Copy link
Contributor

in the meantime how do I rerun CI

There is "Rerun all jobs" button in top-right corner of the workflow run. I've restarted it now, just in case you don't have permissions and don't see that button.

Copy link
Contributor

@pstibrany pstibrany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my feedback. I've left few more (non-blocking) comments to take a look at, if you want.


if len(changesRequired) > 0 {
if c.dryRun {
level.Info(c.logger).Log("msg", "Block requires changes (dry-run)", "block", blockID.String(), "changes_required", strings.Join(changesRequired, ","))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to add "user" into all log messages here. (Or use ulogger := log.With(c.logger, "user", user) use wrapped ulogger only in entire method.)

func convertMetadata(meta metadata.Meta, expectedUser string) (metadata.Meta, []string) {
var changesRequired []string

if meta.Thanos.Labels == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this check or creating of empty meta.Thanos.Labels, since it will be replaced anyway. Reading from nil map, or deleting from nil map will work as expected. (I'd suggest adding unit test for that)

logger, err := log.NewPrometheusLogger(loglvl, logfmt)
if err != nil {
fmt.Printf("failed to create logger: %v\n", err)
flag.Usage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Printing usage hides the error message. I'd suggest removing flag.Usage() calls after print errors.

if configFilename != "" {
buf, err := ioutil.ReadFile(configFilename)
if err != nil {
level.Error(logger).Log("msg", "failed to load config file", "err", err, "filename", configFilename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This methods uses three different ways to print errors: logger, fmt.Printf (stdout) and fmt.Fprintf(flag.Commandline.Output()) which uses stderr. I'd suggest to print errors followed by os.Exit(1) to stderr.

os.Exit(1)
}

fmt.Println("Results:")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use logger for logging the progress in Run method. Perhaps we should use logger for logging the result too?


iterCtx := context.Background()
results, err := converter.Run(iterCtx)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be some results already when this error happens. Should this code simply log results first, and error (if any) as last?

@milesbxf
Copy link
Contributor Author

Thanks for addressing my feedback. I've left few more (non-blocking) comments to take a look at, if you want.

Thanks @pstibrany - I've addressed some of these 🙏 I probably won't be able to spend much more time on this, so let me know if there's anything blocking the merge you'd like me to address

* Add user field to log lines during bucket iteration
* Remove flag.Usage from errors
* Remove logging from main function; Fprintf to stderr instead

Signed-off-by: Miles Bryant <milesbryant@monzo.com>
@pracucci
Copy link
Contributor

Thanks a lot @milesbxf for your hard work!

I probably won't be able to spend much more time on this, so let me know if there's anything blocking the merge you'd like me to address

Sure. It's a great tool, let's ship it! 🚀

@pracucci pracucci merged commit 532a320 into cortexproject:master Feb 10, 2021
@milesbxf milesbxf deleted the thanosconvert branch February 18, 2021 11:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Build a tool to migrate TSDB blocks from Thanos/Prometheus to Cortex blocks storage
3 participants