Skip to content

mongo: test mongo versions 6.0/7.0/8.0 + authentication #3102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
runner: [ubuntu-latest-16-cores]
db-version: [{pg: 15, mysql: 'mysql-gtid'}, {pg: 16, mysql: 'mysql-pos'}, {pg: 17, mysql: 'maria'}]
db-version: [{pg: 15, mysql: 'mysql-gtid', mongo: '6.0'}, {pg: 16, mysql: 'mysql-pos', mongo: '7.0'}, {pg: 17, mysql: 'maria', mongo: '8.0'}]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down Expand Up @@ -131,18 +131,39 @@ jobs:

- name: Mongo
run: |
docker run -d --rm --name mongo -p 27017:27017 mongo:8.0.10 --replSet rs0 --bind_ip_all
echo "starting mongoDB..."
docker run -d --rm --name mongo -p 27017:27017 mongo:${{ matrix.db-version.mongo }} \
bash -c 'openssl rand -base64 756 > /data/mongo.key && chmod 400 /data/mongo.key && mongod --replSet rs0 --bind_ip_all --keyFile /data/mongo.key'

until docker exec mongo mongosh --eval 'db.runCommand({ ping: 1 })' &> /dev/null; do
echo "Waiting for MongoDB to be ready..."
echo "waiting for MongoDB to be ready..."
sleep 2
done

echo "initialize replica set"
docker exec mongo mongosh --eval 'rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" }
]
members: [{ _id: 0, host: "localhost:27017" }]
})'

echo "create admin user for writing data to mongo"
docker exec mongo mongosh --eval '
db = db.getSiblingDB("admin");
db.createUser({
user: "admin",
pwd: "admin",
roles: ["root"]
})'

echo "create non-admin user for reading data from changestream"
docker exec mongo mongosh -u admin -p admin --eval '
db = db.getSiblingDB("admin");
db.createUser({
user: "csuser",
pwd: "cspass",
roles: ["readAnyDatabase"]
})'

- name: MinIO TLS
run: >
mkdir -p certs &&
Expand Down Expand Up @@ -242,8 +263,8 @@ jobs:
ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200
CI_PG_VERSION: ${{ matrix.db-version.pg }}
CI_MYSQL_VERSION: ${{ matrix.db-version.mysql }}
CI_MONGO_VERSION: 8.0.10
CI_MONGO_URI: mongodb://localhost:27017/?replicaSet=rs0
CI_MONGO_ADMIN_URI: mongodb://admin:admin@localhost:27017/?replicaSet=rs0&authSource=admin
CI_MONGO_URI: mongodb://csuser:cspass@localhost:27017/?replicaSet=rs0&authSource=admin
ENABLE_OTEL_METRICS: ${{ (matrix.db-version.pg == '16' || matrix.db-version.mysql == 'mysql-pos') && 'true' || 'false' }}
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: http://localhost:4317
OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: grpc
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
return nil
}

func (c *MongoConnector) Client() *mongo.Client {
return c.client
}

func (c *MongoConnector) GetTableSchema(
ctx context.Context,
_ map[string]string,
Expand Down
12 changes: 10 additions & 2 deletions flow/e2e/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/PeerDB-io/peerdb/flow/connectors"
connmongo "github.com/PeerDB-io/peerdb/flow/connectors/mongo"
Expand All @@ -17,6 +18,9 @@ import (
type MongoSource struct {
conn *connmongo.MongoConnector
config *protos.MongoConfig

// more privileged admin client for writing to mongo for tests
adminClient *mongo.Client
}

func (s *MongoSource) GeneratePeer(t *testing.T) *protos.Peer {
Expand All @@ -34,20 +38,24 @@ func (s *MongoSource) GeneratePeer(t *testing.T) *protos.Peer {

func (s *MongoSource) Teardown(t *testing.T, ctx context.Context, suffix string) {
t.Helper()
db := s.conn.Client().Database(GetTestDatabase(suffix))
db := s.adminClient.Database(GetTestDatabase(suffix))
_ = db.Drop(t.Context())
}

func (s *MongoSource) Connector() connectors.Connector {
return s.conn
}

func (s *MongoSource) AdminClient() *mongo.Client {
return s.adminClient
}

func (s *MongoSource) Exec(ctx context.Context, sql string) error {
return errors.ErrUnsupported
}

func (s *MongoSource) GetRows(ctx context.Context, suffix, table, cols string) (*model.QRecordBatch, error) {
collection := s.conn.Client().Database(GetTestDatabase(suffix)).Collection(table)
collection := s.adminClient.Database(GetTestDatabase(suffix)).Collection(table)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
Expand Down
29 changes: 19 additions & 10 deletions flow/e2e/mongo/mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"

connmongo "github.com/PeerDB-io/peerdb/flow/connectors/mongo"
"github.com/PeerDB-io/peerdb/flow/e2e"
Expand Down Expand Up @@ -40,8 +42,8 @@ func SetupMongoClickhouseSuite(t *testing.T) MongoClickhouseSuite {
func SetupMongo(t *testing.T, suffix string) (*MongoSource, error) {
t.Helper()

mongoVersion := os.Getenv("CI_MONGO_VERSION")
require.NotEmpty(t, mongoVersion, "missing CI_MONGO_VERSION env var")
mongoAdminUri := os.Getenv("CI_MONGO_ADMIN_URI")
require.NotEmpty(t, mongoAdminUri, "missing CI_MONGO_ADMIN_URI env var")

mongoUri := os.Getenv("CI_MONGO_URI")
require.NotEmpty(t, mongoUri, "missing CI_MONGO_URI env var")
Expand All @@ -51,11 +53,18 @@ func SetupMongo(t *testing.T, suffix string) (*MongoSource, error) {
mongoConn, err := connmongo.NewMongoConnector(t.Context(), mongoConfig)
require.NoError(t, err, "failed to setup mongo connector")

adminClient, err := mongo.Connect(options.Client().
SetAppName("Mongo admin client").
SetCompressors([]string{"zstd", "snappy"}).
SetReadPreference(readpref.Primary()).
ApplyURI(mongoAdminUri))
require.NoError(t, err, "failed to setup mongo admin client")

testDb := GetTestDatabase(suffix)
db := mongoConn.Client().Database(testDb)
db := adminClient.Database(testDb)
_ = db.Drop(t.Context())

return &MongoSource{conn: mongoConn, config: mongoConfig}, err
return &MongoSource{conn: mongoConn, config: mongoConfig, adminClient: adminClient}, err
}

func (s MongoClickhouseSuite) Test_Simple_Flow() {
Expand All @@ -72,8 +81,8 @@ func (s MongoClickhouseSuite) Test_Simple_Flow() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true

client := s.Source().Connector().(*connmongo.MongoConnector).Client()
collection := client.Database(srcDatabase).Collection(srcTable)
adminClient := s.Source().(*MongoSource).AdminClient()
collection := adminClient.Database(srcDatabase).Collection(srcTable)
// insert 10 rows into the source table for initial load
for i := range 10 {
testKey := fmt.Sprintf("init_key_%d", i)
Expand Down Expand Up @@ -118,8 +127,8 @@ func (s MongoClickhouseSuite) Test_Inconsistent_Schema() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true

client := s.Source().Connector().(*connmongo.MongoConnector).Client()
collection := client.Database(srcDatabase).Collection(srcTable)
adminClient := s.Source().(*MongoSource).AdminClient()
collection := adminClient.Database(srcDatabase).Collection(srcTable)

// adding/removing fields should work
docs := []bson.D{
Expand Down Expand Up @@ -170,8 +179,8 @@ func (s MongoClickhouseSuite) Test_Update_Replace_Delete_Events() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true

client := s.Source().Connector().(*connmongo.MongoConnector).Client()
collection := client.Database(srcDatabase).Collection(srcTable)
adminClient := s.Source().(*MongoSource).AdminClient()
collection := adminClient.Database(srcDatabase).Collection(srcTable)

insertRes, err := collection.InsertOne(t.Context(), bson.D{bson.E{Key: "key", Value: 1}}, options.InsertOne())
require.NoError(t, err)
Expand Down