Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: milvus-cdc createCollection & dropCollection synchronization disorder #39077

Open
1 task done
sunwsh opened this issue Jan 8, 2025 · 2 comments
Open
1 task done
Assignees
Labels
kind/bug Issues or changes related a bug triage/accepted Indicates an issue or PR is ready to be actively worked on.
Milestone

Comments

@sunwsh
Copy link

sunwsh commented Jan 8, 2025

Is there an existing issue for this?

  • I have searched the existing issues

Environment

- Milvus version: 2.4.14
- Deployment mode(standalone or cluster): cluster
- MQ type(rocksmq, pulsar or kafka): kafka 
- SDK version(e.g. pymilvus v2.0.0rc2): java 2.4.6
- OS(Ubuntu or CentOS): Ubuntu
- CPU/Memory: 12
- GPU: null
- Others:

Current Behavior

  1. 主集群 dropCollection A
  2. 主集群等 4分钟后 createCollection
  3. 主集群 insert 插入向量
  4. milvus-cdc 同步任务中断了

Expected Behavior

milvus-cdc 同步任务能正常执行。

Steps To Reproduce

1. 需要 创建多个 collection, 会用两个以上 collection 使用同一个 kafka dml channel 时。
2. 对这些collection 插入很多向量,造成 milvus-cdc 同步积压。
3. 删除 那个积压的 collection, 为了复现尽量一个 collection 用一个 ShardsNum=1.
4. 然后创建同名的 collection。
5. 在 milvus-cdc 上会看到 先收到 etcd 创建 collection 消息,后收到 kafka 的 dropCollection 消息。

Milvus Log

// etcd 发现删除collection
[2025/01/04 09:11:03.940 +08:00] [INFO] [reader/etcd_op.go:223] ["the collection state is not created"] [key=gd16-seareco-base/meta/root-coord/database/collection-info/446470090779773910/453896603949084629]
                                                                                [collection_name=m182_128_dssm_v2_0] [state=CollectionDropping]
 
// 收到创建同名的collection 消息
[2025/01/04 09:15:03.959 +08:00] [INFO] [reader/etcd_op.go:223] ["the collection state is not created"] [key=gd16-seareco-base/meta/root-coord/database/collection-info/446470090779773910/453896605121592775] [collection_name=m182_128_dssm_v2_0] [state=CollectionCreating]
[2025/01/04 09:15:03.971 +08:00] [INFO] [reader/collection_reader.go:100] ["has watched to read collection"] [collection_name=m182_128_dssm_v2_0] [collection_id=453896605121592775]
[2025/01/04 09:15:03.982 +08:00] [INFO] [reader/replicate_channel_manager.go:383] ["waiting handler"] [partition_id=453896605121592781] [collection_id=453896605121592775] [collection_name=m182_128_dssm_v2_0] [partition_name=0] [collection_id=453896605121592775]
[2025/01/04 09:15:03.990 +08:00] [INFO] [reader/replicate_channel_manager.go:240] ["success to get the collection info in the target instance"] [collection_name=m182_128_dssm_v2_0]
[2025/01/04 09:15:03.990 +08:00] [INFO] [reader/replicate_channel_manager.go:738] ["add collection to channel handler"] [collection_id=453896605121592775] [collection_name=m182_128_dssm_v2_0]
[2025/01/04 09:15:03.990 +08:00] [INFO] [reader/collection_reader.go:118] ["has started to read collection"] [collection_name=m182_128_dssm_v2_0] [collection_id=453896605121592775]
[2025/01/04 09:15:04.982 +08:00] [INFO] [reader/replicate_channel_manager.go:433] ["start to add partition"] [partition_id=453896605121592781] [collection_id=453896605121592775] [collection_name=m182_128_dssm_v2_0] [partition_name=0] [num=1]
[2025/01/04 09:15:04.983 +08:00] [INFO] [reader/replicate_channel_manager.go:829] ["add partition info done"] [collection_name=m182_128_dssm_v2_0] [collection_id=453896605121592775] [partition_id=453896605121592781] [partition_name=0]
 
// 真正在kafka中消费中,删除消息的时间是 2025/01/04 09:19:11.151, 这个消息是在主集群发送出来的
[2025/01/04 09:19:11.151 +08:00] [INFO] [reader/replicate_channel_manager.go:797] ["remove collection from handler"] [collection_id=453896603949084629]
[2025/01/04 09:19:11.151 +08:00] [INFO] [writer/channel_writer.go:124] ["receive replicate api event"] [event=DropCollection] [collection=m182_128_dssm_v2_0]
[2025/01/04 09:19:11.164 +08:00] [INFO] [writer/channel_writer.go:126] ["finish to handle replicate api event"] [event=DropCollection] [collection=m182_128_dssm_v2_0]
 
// 主集群中查询这个删除消息是在这里就消费过了,说明消息这之前已经在 kafka 队列里了,
[2025/01/04 09:11:08.598 +08:00] [INFO] [datanode/flow_graph_dd_node.go:149] ["Receiving DropCollection msg"] [collectionID=453896603949084629]
      [vChannelName=gd16-seareco-base-rootcoord-dml_2_453896603949084629v0] [beginTs=455069681096851474] [endTs=455069681096851474]

Anything else?

修改意见:

  1. 在主集群 dropCollection 时,也额外发一下消息到 kafka replicateMessage topic 中。
  2. 在主集群 createCollection 时,也额外发一下消息到 kafka replicateMessage topic 中。(用于milvus-cdc 判断这两个消息顺序)
  3. 收到 dropCollection 消息后,就删除备集群 collection,以后在 kafka dml channel 中收到的这个 collection 的消息就跳过。
@sunwsh sunwsh added kind/bug Issues or changes related a bug needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels Jan 8, 2025
@yanliang567
Copy link
Contributor

/assign @SimFG
/unassign

@sre-ci-robot sre-ci-robot assigned SimFG and unassigned yanliang567 Jan 8, 2025
@yanliang567 yanliang567 added triage/accepted Indicates an issue or PR is ready to be actively worked on. and removed needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels Jan 8, 2025
@yanliang567 yanliang567 added this to the 2.4.21 milestone Jan 8, 2025
@xiaofan-luan
Copy link
Collaborator

I believe SimFG is already working on it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Issues or changes related a bug triage/accepted Indicates an issue or PR is ready to be actively worked on.
Projects
None yet
Development

No branches or pull requests

4 participants