Skip to content

Commit

Permalink
test: add StartReadCollectionForKafka unit test (#138)
Browse files Browse the repository at this point in the history
* test: add StartReadCollectionForKafka unit test

* fix: unit test related to kafka
  • Loading branch information
Ricky-chen1 authored Oct 21, 2024
1 parent 2ec39d0 commit 949cb3b
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 9 deletions.
6 changes: 3 additions & 3 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *replicateChannelManager) startReadCollectionForKafka(ctx context.Contex
}

// send api event when the collection is not replicated and ctx is not done
if err := r.sendCreateCollectionvent(ctx, info, sourceDBInfo); err != nil {
if err := r.sendCreateCollectionEvent(ctx, info, sourceDBInfo); err != nil {
return nil, err
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (r *replicateChannelManager) startReadCollectionForMilvus(ctx context.Conte
zap.String("collection_name", info.Schema.Name))
return nil, nil
}
err = r.sendCreateCollectionvent(ctx, info, sourceDBInfo)
err = r.sendCreateCollectionEvent(ctx, info, sourceDBInfo)
if err != nil {
return nil, err
}
Expand All @@ -270,7 +270,7 @@ func (r *replicateChannelManager) startReadCollectionForMilvus(ctx context.Conte
return targetInfo, nil
}

func (r *replicateChannelManager) sendCreateCollectionvent(ctx context.Context, info *pb.CollectionInfo, sourceDBInfo *model.DatabaseInfo) error {
func (r *replicateChannelManager) sendCreateCollectionEvent(ctx context.Context, info *pb.CollectionInfo, sourceDBInfo *model.DatabaseInfo) error {
select {
case <-ctx.Done():
log.Warn("context is done in the start read collection")
Expand Down
205 changes: 199 additions & 6 deletions core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewReplicateChannelManagerWithFactory(mqConfig config.MQConfig,
readConfig config.ReaderConfig,
metaOp api.MetaOp,
msgPackCallback func(string, *msgstream.MsgPack),
downstream string,
) (api.ChannelManager, error) {
streamDispatchClient, err := GetMsgDispatcherClient(factoryCreator, mqConfig, false)
if err != nil {
Expand Down Expand Up @@ -84,11 +85,11 @@ func NewReplicateChannelManagerWithFactory(mqConfig config.MQConfig,
msgPackCallback: msgPackCallback,
addCollectionLock: &deadlock.RWMutex{},
addCollectionCnt: new(int),
downstream: "milvus",
downstream: downstream,
}, nil
}

func TestNewReplicateChannelManager(t *testing.T) {
func TestNewReplicateChannelManagerForMilvus(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
_, err := NewReplicateChannelManagerWithFactory(config.MQConfig{}, NewDefaultFactoryCreator(), nil, config.ReaderConfig{
MessageBufferSize: 10,
Expand All @@ -98,7 +99,8 @@ func TestNewReplicateChannelManager(t *testing.T) {
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
},
"milvus")
assert.Error(t, err)
})

Expand All @@ -118,7 +120,42 @@ func TestNewReplicateChannelManager(t *testing.T) {
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
}, "milvus")
assert.NoError(t, err)
})
}

func TestNewReplicateChannelManagerForKafka(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
_, err := NewReplicateChannelManagerWithFactory(config.MQConfig{}, NewDefaultFactoryCreator(), nil, config.ReaderConfig{
MessageBufferSize: 10,
Retry: config.RetrySettings{
RetryTimes: 1,
InitBackOff: 1,
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
}, "kafka")
assert.Error(t, err)
})

t.Run("success", func(t *testing.T) {
factoryCreator := mocks.NewFactoryCreator(t)
factory := msgstream.NewMockMqFactory()
factoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(factory)
_, err := NewReplicateChannelManagerWithFactory(config.MQConfig{
Pulsar: config.PulsarConfig{
Address: "pulsar://localhost:6650",
},
}, factoryCreator, nil, config.ReaderConfig{
MessageBufferSize: 10,
Retry: config.RetrySettings{
RetryTimes: 1,
InitBackOff: 1,
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
}, "kafka")
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -167,7 +204,7 @@ func TestChannelUtils(t *testing.T) {
})
}

func TestStartReadCollection(t *testing.T) {
func TestStartReadCollectionForMilvus(t *testing.T) {
util.InitMilvusPkgParam()

factoryCreator := mocks.NewFactoryCreator(t)
Expand All @@ -188,7 +225,7 @@ func TestStartReadCollection(t *testing.T) {
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
})
}, "milvus")
assert.NoError(t, err)
manager.SetCtx(context.Background())

Expand Down Expand Up @@ -305,6 +342,162 @@ func TestStartReadCollection(t *testing.T) {
VirtualChannelNames: []string{"collection-partition-p1_v0"},
}, nil)
assert.NoError(t, err)
channel := <-realManager.GetChannelChan()
assert.Equal(t, "collection-partition-p2", channel)
}

// partition not found
{
realManager.retryOptions = []retry.Option{
retry.Attempts(1),
}
err := realManager.AddPartition(context.Background(), &model.DatabaseInfo{}, &pb.CollectionInfo{
ID: 41,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
}, &pb.PartitionInfo{})
assert.Error(t, err)
}

// add partition
{
err := realManager.AddPartition(context.Background(), &model.DatabaseInfo{}, &pb.CollectionInfo{
ID: 31001,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
}, &pb.PartitionInfo{
PartitionName: "_default2",
})
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)

event := <-realManager.GetEventChan()
assert.Equal(t, api.ReplicateCreatePartition, event.EventType)
}

// stop read collection
{
err := realManager.StopReadCollection(context.Background(), &pb.CollectionInfo{
ID: 31001,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "collection_partition_p1",
},
},
})
assert.NoError(t, err)
}
})
}

func TestStartReadCollectionForKafka(t *testing.T) {
util.InitMilvusPkgParam()

factoryCreator := mocks.NewFactoryCreator(t)
factory := msgstream.NewMockFactory(t)
factoryCreator.EXPECT().NewPmsFactory(mock.Anything).Return(factory)

manager, err := NewReplicateChannelManagerWithFactory(config.MQConfig{
Pulsar: config.PulsarConfig{
Address: "pulsar://localhost:6650",
},
}, factoryCreator, nil, config.ReaderConfig{
MessageBufferSize: 10,
Retry: config.RetrySettings{
RetryTimes: 1,
InitBackOff: 1,
MaxBackOff: 1,
},
}, &api.DefaultMetaOp{}, func(s string, pack *msgstream.MsgPack) {
}, "kafka")
assert.NoError(t, err)
manager.SetCtx(context.Background())

t.Run("context cancel", func(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
err = manager.StartReadCollection(ctx, &model.DatabaseInfo{}, &pb.CollectionInfo{}, nil)
assert.Error(t, err)
})

realManager := manager.(*replicateChannelManager)
stream := msgstream.NewMockMsgStream(t)
streamChan := make(chan *msgstream.MsgPack)

factory.EXPECT().NewMsgStream(mock.Anything).Return(stream, nil).Maybe()
stream.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
stream.EXPECT().Chan().Return(streamChan).Maybe()
stream.EXPECT().Close().Return().Maybe()

t.Run("read channel", func(t *testing.T) {
{
// start read
handler, err := realManager.startReadChannel(&model.SourceCollectionInfo{
PChannel: "kafka_test_read_channel",
VChannel: "kafka_test_read_channel_v0",
CollectionID: 11001,
ShardNum: 1,
}, &model.TargetCollectionInfo{
CollectionID: 21001,
CollectionName: "read_channel",
PartitionInfo: map[string]int64{
"_default": 1101,
},
PChannel: "kafka_ttest_read_channel",
VChannel: "kafka_ttest_read_channel_v0",
BarrierChan: util.NewOnceWriteChan(make(chan<- uint64)),
PartitionBarrierChan: map[int64]*util.OnceWriteChan[uint64]{
1101: util.NewOnceWriteChan(make(chan<- uint64)),
},
})
assert.NoError(t, err)
handler.startReadChannel()
assert.Equal(t, "kafka_ttest_read_channel", <-realManager.GetChannelChan())

_, err = realManager.startReadChannel(&model.SourceCollectionInfo{
PChannel: "kafka_test_read_channel_2",
VChannel: "kafka_test_read_channel_2_v0",
CollectionID: 11002,
}, &model.TargetCollectionInfo{
CollectionName: "kafka_read_channel_2",
PChannel: "kafka_ttest_read_channel_2",
VChannel: "kafka_ttest_read_channel_2_v0",
})
assert.NoError(t, err)
}
{
assert.NotNil(t, realManager.GetMsgChan("kafka_ttest_read_channel"))
assert.Nil(t, realManager.GetMsgChan("no_exist_channel"))
}
{
// stop read
realManager.stopReadChannel("no_exist_channel", 11001)
realManager.stopReadChannel("kafka_test_read_channel", 11001)
realManager.stopReadChannel("kafka_test_read_channel", 11002)
}
})

t.Run("collection and partition", func(t *testing.T) {
// start read collection
{
err := realManager.StartReadCollection(context.Background(), &model.DatabaseInfo{}, &pb.CollectionInfo{
ID: 31001,
Schema: &schemapb.CollectionSchema{
Name: "test",
},
StartPositions: []*commonpb.KeyDataPair{
{
Key: "collection_partition_p1",
},
},
PhysicalChannelNames: []string{"collection-partition-p1"},
VirtualChannelNames: []string{"collection-partition-p1_v0"},
}, nil)
assert.NoError(t, err)
event := <-realManager.GetEventChan()
assert.Equal(t, api.ReplicateCreateCollection, event.EventType)
}

// partition not found
Expand Down
3 changes: 3 additions & 0 deletions server/model/request/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ func GetTask(taskInfo *meta.TaskInfo) Task {
taskInfo.MilvusConnectParam.Username = ""
taskInfo.MilvusConnectParam.Password = ""
taskInfo.MilvusConnectParam.Token = ""
taskInfo.KafkaConnectParam.SASL.Username = ""
taskInfo.KafkaConnectParam.SASL.Password = ""
return Task{
TaskID: taskInfo.TaskID,
MilvusConnectParam: taskInfo.MilvusConnectParam,
KafkaConnectParam: taskInfo.KafkaConnectParam,
CollectionInfos: taskInfo.CollectionInfos,
State: taskInfo.State.String(),
LastPauseReason: taskInfo.Reason,
Expand Down

0 comments on commit 949cb3b

Please sign in to comment.