Skip to content

Commit

Permalink
support to replicate the database by the create request
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Oct 16, 2024
1 parent 2313b18 commit ffe9d3c
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 88 deletions.
138 changes: 138 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,141 @@ jobs:
scripts/k8s_logs
tests/deployment/upstream/logs
server/server.log
milvus-cdc-function-test-with-db:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- uses: actions/setup-go@v4
with:
go-version: '1.21'
cache-dependency-path: server/go.sum
cache: true

- name: set up python
uses: actions/setup-python@v2
with:
python-version: '3.8'
cache: 'pip'

- name: Build CDC
timeout-minutes: 15
working-directory: server
shell: bash
run: |
make build
ls -l
- name: Creating kind cluster
uses: helm/kind-action@v1.2.0

- name: Print cluster information
run: |
kubectl config view
kubectl cluster-info
kubectl get nodes
kubectl get pods -o wide -n kube-system
helm version
kubectl version
- name: Deploy Source Milvus
timeout-minutes: 15
shell: bash
working-directory: tests/deployment/upstream/
run: |
docker compose up -d
bash ../../../scripts/check_healthy.sh
docker compose ps -a
- name: Deploy Downstream Milvus
timeout-minutes: 15
shell: bash
working-directory: tests/deployment/downstream
run: |
helm repo add milvus https://zilliztech.github.io/milvus-helm
helm repo update
helm install --wait --timeout 720s cdc-downstream milvus/milvus -f standalone-values-auth.yaml
kubectl get pods
kubectl port-forward service/cdc-downstream-milvus 19500:19530 >/dev/null 2>&1 &
sleep 20s
nc -vz 127.0.0.1 19500
- name: Deploy Milvus CDC
timeout-minutes: 15
working-directory: server
shell: bash
run: |
cp ../deployment/docker/cdc.yaml configs/cdc.yaml
../bin/cdc > server.log 2>&1 &
sleep 20s
- name: Create CDC task
timeout-minutes: 15
run: |
curl --location '127.0.0.1:8444/cdc' \
--header 'Content-Type: application/json' \
--data '{
"request_type": "create",
"request_data": {
"milvus_connect_param": {
"uri": "http://127.0.0.1:19500",
"token": "root:Milvus",
"connect_timeout": 120
},
"collection_infos": [
{
"name": "*"
}
],
"database_info": {
"name": "foo"
}
}
}'
- name: Run test
timeout-minutes: 15
shell: bash
working-directory: tests
run: |
pip install -r requirements.txt --trusted-host https://test.pypi.org
pytest testcases/test_cdc_database.py --upstream_host 127.0.0.1 --upstream_port 19530 --downstream_host 127.0.0.1 --downstream_port 19500
- name: List CDC task
if: ${{ always() }}
timeout-minutes: 15
working-directory: server
shell: bash
run: |
cat server.log | tail -n 100
curl --location '127.0.0.1:8444/cdc' \
--header 'Content-Type: application/json' \
--data '{
"request_type": "list"
}'
- name: Export upstream milvus logs
if: ${{ always() }}
timeout-minutes: 5
working-directory: tests/deployment/upstream
run: |
docker compose ps -a
docker stats --no-stream
bash ../../../scripts/export_log_docker.sh
- name: Export downstream milvus logs
if: ${{ always() }}
timeout-minutes: 5
working-directory: scripts
run: |
kubectl get pods || true
bash export_log_k8s.sh default cdc-downstream k8s_logs
- name: Upload logs
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: func-test-logs-with-db
path: |
scripts/k8s_logs
tests/deployment/upstream/logs
server/server.log
8 changes: 4 additions & 4 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ type ChannelManager interface {
AddDroppedCollection(ids []int64)
AddDroppedPartition(ids []int64)

StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
StartReadCollection(ctx context.Context, db *model.DatabaseInfo, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error
AddPartition(ctx context.Context, dbInfo *model.DatabaseInfo, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error

GetChannelChan() <-chan string
GetMsgChan(pChannel string) <-chan *ReplicateMsg
Expand Down Expand Up @@ -103,7 +103,7 @@ func (d *DefaultChannelManager) AddDroppedPartition(ids []int64) {
log.Warn("AddDroppedPartition is not implemented, please check it")
}

func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, db *model.DatabaseInfo, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
log.Warn("StartReadCollection is not implemented, please check it")
return nil
}
Expand All @@ -113,7 +113,7 @@ func (d *DefaultChannelManager) StopReadCollection(ctx context.Context, info *pb
return nil
}

func (d *DefaultChannelManager) AddPartition(ctx context.Context, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error {
func (d *DefaultChannelManager) AddPartition(ctx context.Context, dbInfo *model.DatabaseInfo, collectionInfo *pb.CollectionInfo, partitionInfo *pb.PartitionInfo) error {
log.Warn("AddPartition is not implemented, please check it")
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions core/api/replicate_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDefaultChannelManager_AddPartition(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultChannelManager{}
if err := d.AddPartition(tt.args.ctx, tt.args.collectionInfo, tt.args.partitionInfo); (err != nil) != tt.wantErr {
if err := d.AddPartition(tt.args.ctx, nil, tt.args.collectionInfo, tt.args.partitionInfo); (err != nil) != tt.wantErr {
t.Errorf("AddPartition() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestDefaultChannelManager_StartReadCollection(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &DefaultChannelManager{}
if err := d.StartReadCollection(tt.args.ctx, tt.args.info, tt.args.seekPositions); (err != nil) != tt.wantErr {
if err := d.StartReadCollection(tt.args.ctx, nil, tt.args.info, tt.args.seekPositions); (err != nil) != tt.wantErr {
t.Errorf("StartReadCollection() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
44 changes: 24 additions & 20 deletions core/mocks/channel_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/zilliztech/milvus-cdc/core/api"
"github.com/zilliztech/milvus-cdc/core/config"
"github.com/zilliztech/milvus-cdc/core/log"
"github.com/zilliztech/milvus-cdc/core/model"
"github.com/zilliztech/milvus-cdc/core/pb"
"github.com/zilliztech/milvus-cdc/core/util"
)
Expand All @@ -48,7 +49,7 @@ type CollectionInfo struct {
positions map[string]*commonpb.KeyDataPair
}

type ShouldReadFunc func(*pb.CollectionInfo) bool
type ShouldReadFunc func(*model.DatabaseInfo, *pb.CollectionInfo) bool

var _ api.Reader = (*CollectionReader)(nil)

Expand Down Expand Up @@ -112,7 +113,8 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
}

collectionLog.Info("has watched to read collection")
if !reader.shouldReadFunc(info) {
dbInfo := reader.metaOp.GetDatabaseInfoForCollection(ctx, info.ID)
if !reader.shouldReadFunc(&dbInfo, info) {
collectionLog.Info("the collection should not be read")
return false
}
Expand All @@ -124,7 +126,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
Timestamp: info.CreateTime,
})
}
if err := reader.channelManager.StartReadCollection(ctx, info, startPositions); err != nil {
if err := reader.channelManager.StartReadCollection(ctx, &dbInfo, info, startPositions); err != nil {
collectionLog.Warn("fail to start to replicate the collection data in the watch process", zap.Any("info", info), zap.Error(err))
reader.sendError(err)
}
Expand Down Expand Up @@ -168,12 +170,13 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
Name: collectionName,
},
}
if !reader.shouldReadFunc(tmpCollectionInfo) {
dbInfo := reader.metaOp.GetDatabaseInfoForCollection(ctx, tmpCollectionInfo.ID)
if !reader.shouldReadFunc(&dbInfo, tmpCollectionInfo) {
partitionLog.Info("the partition should not be read", zap.String("name", collectionName))
return true
}

err := reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
err := reader.channelManager.AddPartition(ctx, &dbInfo, tmpCollectionInfo, info)
if err != nil {
partitionLog.Warn("fail to add partition", zap.String("collection_name", collectionName), zap.Any("partition", info), zap.Error(err))
reader.sendError(err)
Expand All @@ -187,7 +190,8 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
readerLog := log.With(zap.String("task_id", reader.id))

existedCollectionInfos, err := reader.metaOp.GetAllCollection(ctx, func(info *pb.CollectionInfo) bool {
return !reader.shouldReadFunc(info)
// return !reader.shouldReadFunc(info)
return false
})
if err != nil {
readerLog.Warn("get all collection failed", zap.Error(err))
Expand Down Expand Up @@ -230,7 +234,8 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
readerLog.Info("skip to start to read collection", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
continue
}
if !reader.shouldReadFunc(info) {
dbInfo := reader.metaOp.GetDatabaseInfoForCollection(ctx, info.ID)
if !reader.shouldReadFunc(&dbInfo, info) {
readerLog.Info("the collection is not in the watch list", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
continue
}
Expand All @@ -252,7 +257,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
zap.String("name", info.Schema.Name),
zap.Int64("collection_id", info.ID),
zap.String("state", info.State.String()))
if err := reader.channelManager.StartReadCollection(ctx, info, seekPositions); err != nil {
if err := reader.channelManager.StartReadCollection(ctx, &dbInfo, info, seekPositions); err != nil {
readerLog.Warn("fail to start to replicate the collection data", zap.Any("collection", info), zap.Error(err))
reader.sendError(err)
}
Expand Down Expand Up @@ -288,7 +293,8 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
Name: collectionName,
},
}
if !reader.shouldReadFunc(tmpCollectionInfo) {
dbInfo := reader.metaOp.GetDatabaseInfoForCollection(ctx, tmpCollectionInfo.ID)
if !reader.shouldReadFunc(&dbInfo, tmpCollectionInfo) {
readerLog.Info("the collection is not in the watch list", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName))
return true
}
Expand All @@ -297,7 +303,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
zap.Int64("partition_id", info.PartitionID),
zap.String("collection_name", collectionName),
zap.Int64("collection_id", info.CollectionId))
err := reader.channelManager.AddPartition(ctx, tmpCollectionInfo, info)
err := reader.channelManager.AddPartition(ctx, &dbInfo, tmpCollectionInfo, info)
if err != nil {
readerLog.Warn("fail to add partition", zap.String("collection_name", collectionName), zap.String("partition_name", info.PartitionName), zap.Error(err))
reader.sendError(err)
Expand Down
Loading

0 comments on commit ffe9d3c

Please sign in to comment.