Skip to content

Commit 3748e68

Browse files
authored
mongo: add validation for (1) storage engine (2) oplog retention (#3168)
>You can specify the minimum number of hours to preserve an oplog entry where [mongod](https://www.mongodb.com/docs/manual/reference/program/mongod/#mongodb-binary-bin.mongod) only removes an oplog entry if both of the following criteria are met: -The oplog has reached the [maximum configured size.](https://www.mongodb.com/docs/manual/core/replica-set-oplog/#std-label-replica-set-oplog-sizing) -The oplog entry is older than the configured number of hours based on the host system clock. By default MongoDB does not set a minimum oplog retention period and automatically truncates the oplog starting with the oldest entries to maintain the configured maximum oplog size. Seems like a good idea to require a min oplog retention (instead of just relying on max oplog size). Also add a validation for storage engne, as 'wiredTiger' is a prerequisite for changestream to work. Test: tested e2e locally
1 parent 9e42dd6 commit 3748e68

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

flow/connectors/mongo/validate.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package connmongo
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
"github.com/PeerDB-io/peerdb/flow/generated/protos"
@@ -30,7 +31,20 @@ func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
3031

3132
_, err := shared_mongo.GetReplSetGetStatus(ctx, c.client)
3233
if err != nil {
33-
return fmt.Errorf("failed to get replica set status: %w", err)
34+
return err
35+
}
36+
37+
serverStatus, err := shared_mongo.GetServerStatus(ctx, c.client)
38+
if err != nil {
39+
return err
40+
}
41+
if serverStatus.StorageEngine.Name != "wiredTiger" {
42+
return errors.New("storage engine must be 'wiredTiger'")
3443
}
44+
if serverStatus.OplogTruncation.OplogMinRetentionHours == 0 ||
45+
serverStatus.OplogTruncation.OplogMinRetentionHours < shared_mongo.MinOplogRetentionHours {
46+
return errors.New("oplog retention must be set to >= 24 hours")
47+
}
48+
3549
return nil
3650
}

flow/e2eshared/e2eshared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func RunSuite[T Suite](t *testing.T, setup func(t *testing.T) T) {
3333
subtest.Parallel()
3434
suite := setup(subtest)
3535
subtest.Cleanup(func() {
36-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
36+
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
3737
defer cancel()
3838
suite.Teardown(ctx)
3939
})
@@ -56,7 +56,7 @@ func RunSuiteNoParallel[T Suite](t *testing.T, setup func(t *testing.T) T) {
5656
t.Run(m.Name, func(subtest *testing.T) {
5757
suite := setup(subtest)
5858
subtest.Cleanup(func() {
59-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
59+
ctx, cancel := context.WithTimeout(t.Context(), time.Minute)
6060
defer cancel()
6161
suite.Teardown(ctx)
6262
})

flow/shared/mongo/validation.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import (
88
"go.mongodb.org/mongo-driver/v2/mongo"
99
)
1010

11-
const MinSupportedVersion = "5.1.0"
11+
const (
12+
MinSupportedVersion = "5.1.0"
13+
MinOplogRetentionHours = 24
14+
)
1215

1316
type BuildInfo struct {
1417
Version string `bson:"version"`
@@ -19,6 +22,19 @@ type ReplSetGetStatus struct {
1922
MyState int `bson:"myState"`
2023
}
2124

25+
type OplogTruncation struct {
26+
OplogMinRetentionHours float64 `bson:"oplogMinRetentionHours"`
27+
}
28+
29+
type StorageEngine struct {
30+
Name string `bson:"name"`
31+
}
32+
33+
type ServerStatus struct {
34+
StorageEngine StorageEngine `bson:"storageEngine"`
35+
OplogTruncation OplogTruncation `bson:"oplogTruncation"`
36+
}
37+
2238
func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
2339
singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}})
2440
if singleResult.Err() != nil {
@@ -44,3 +60,17 @@ func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGet
4460
}
4561
return &status, nil
4662
}
63+
64+
func GetServerStatus(ctx context.Context, client *mongo.Client) (*ServerStatus, error) {
65+
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
66+
bson.E{Key: "serverStatus", Value: 1},
67+
})
68+
if singleResult.Err() != nil {
69+
return nil, fmt.Errorf("failed to run 'serverStatus' command: %w", singleResult.Err())
70+
}
71+
var status ServerStatus
72+
if err := singleResult.Decode(&status); err != nil {
73+
return nil, fmt.Errorf("failed to decode ServerStatus: %w", err)
74+
}
75+
return &status, nil
76+
}

0 commit comments

Comments
 (0)