Skip to content

Commit

Permalink
add rate limited bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato committed Sep 13, 2024
1 parent a4e45a9 commit 75bc4bc
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions lib/backend/firestore/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package firestore
import (
"context"

"cloud.google.com/go/firestore"
"github.com/gravitational/trace"

"github.com/gravitational/teleport/lib/backend"
Expand Down Expand Up @@ -78,21 +79,37 @@ func migrateKeyType[T any](ctx context.Context, b *Backend, newKey func([]byte)
return trace.Wrap(err)
}

for _, dbDoc := range docs {
bulkWriter := b.svc.BulkWriter(b.clientContext)
jobs := make([]*firestore.BulkWriterJob, len(docs))
for i, dbDoc := range docs {
newDoc, err := newRecordFromDoc(dbDoc)
if err != nil {
return trace.Wrap(err, "failed to convert document")
}

if _, err := b.svc.Collection(b.CollectionName).
Doc(b.keyToDocumentID(newDoc.Key)).
Set(ctx, newDoc); err != nil {
return trace.Wrap(err, "failed to upsert document")
jobs[i], err = bulkWriter.Set(
b.svc.Collection(b.CollectionName).
Doc(b.keyToDocumentID(newDoc.Key)),
newDoc,
)
if err != nil {
return trace.Wrap(err, "failed stream bulk action")
}

startKey = newKey(newDoc.Key) // update start key
}

bulkWriter.End()
var errs []error
for _, job := range jobs {
if _, err := job.Results(); err != nil {
errs = append(errs, err)
}
}
if err := trace.NewAggregate(errs...); err != nil {
return trace.Wrap(err, "failed to write bulk actions")
}

if len(docs) < limit {
break
}
Expand Down

0 comments on commit 75bc4bc

Please sign in to comment.