From 75bc4bc10264d2dd5d1012e71588ec0da858f919 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Fri, 13 Sep 2024 13:37:30 +0100 Subject: [PATCH] add rate limited bucket --- lib/backend/firestore/migration.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/backend/firestore/migration.go b/lib/backend/firestore/migration.go index f5fdbcd047c3a..cad791647d1ad 100644 --- a/lib/backend/firestore/migration.go +++ b/lib/backend/firestore/migration.go @@ -21,6 +21,7 @@ package firestore import ( "context" + "cloud.google.com/go/firestore" "github.com/gravitational/trace" "github.com/gravitational/teleport/lib/backend" @@ -78,21 +79,37 @@ func migrateKeyType[T any](ctx context.Context, b *Backend, newKey func([]byte) return trace.Wrap(err) } - for _, dbDoc := range docs { + bulkWriter := b.svc.BulkWriter(b.clientContext) + jobs := make([]*firestore.BulkWriterJob, len(docs)) + for i, dbDoc := range docs { newDoc, err := newRecordFromDoc(dbDoc) if err != nil { return trace.Wrap(err, "failed to convert document") } - if _, err := b.svc.Collection(b.CollectionName). - Doc(b.keyToDocumentID(newDoc.Key)). - Set(ctx, newDoc); err != nil { - return trace.Wrap(err, "failed to upsert document") + jobs[i], err = bulkWriter.Set( + b.svc.Collection(b.CollectionName). + Doc(b.keyToDocumentID(newDoc.Key)), + newDoc, + ) + if err != nil { + return trace.Wrap(err, "failed stream bulk action") } startKey = newKey(newDoc.Key) // update start key } + bulkWriter.End() + var errs []error + for _, job := range jobs { + if _, err := job.Results(); err != nil { + errs = append(errs, err) + } + } + if err := trace.NewAggregate(errs...); err != nil { + return trace.Wrap(err, "failed to write bulk actions") + } + if len(docs) < limit { break }