Skip to content

[Bug]: A large number of delete operations have accumulated in Milvus. #38708

Open
@become-nice

Description

Is there an existing issue for this?

  • I have searched the existing issues

Environment

- Milvus version: 2.2.10
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka):    
- SDK version(e.g. pymilvus v2.0.0rc2): 2.2.12
- OS(Ubuntu or CentOS): debian10
- CPU/Memory: 
- GPU: 
- Others:

Current Behavior

When I performed a large number of batch delete and write operations, I found that the compaction performance could not keep up. A large number of deltalogs were accumulated. After checking the logs, I found that the operations from half an hour ago were still being applied. When I was still performing a large number of deletion and insertion operations, the instance would quickly become uninsertable.

[dep334-milvus-cxg-400-standalone-858b88f-dcfsm] {"log":"[2024/12/24 14:51:08.792 +08:00] [INFO] [datanode/flush_task.go:134] [\"running flush insert task\"] [\"segment ID\"=454803129838442013] [flushed=false] [dropped=false] [position=\"channel_name:\\\"cxg-400-rootcoord-dml_9_454803129824392717v0\\\" msgID:\\\"1\\\\225\\\\335i\\\\t\\\\311O\\\\006\\\" msgGroup:\\\"cxg-400-dataNode-248-cbg-40088-rootcoord-dml_9_454803129824392717v0\\\" timestamp:454825398530211840 \"] [PosTime=2024/12/24 14:20:04.110 +08:00]\n","stream":"stdout","time":"2024-12-24T06:51:08.792346734Z"}

Why can't we restrict the user's insertion and deletion operations when the compaction performance cannot keep up? Or has the optimization been made in version 2.4?

Expected Behavior

When compaction cannot keep up, restrict users from inserting and deleting data to avoid the cluster becoming unavailable.

Steps To Reproduce

run this script:

import time

import numpy as np
from pymilvus import (
    connections,
    utility,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

fmt = "\n=== {:30} ===\n"
search_latency_fmt = "search latency = {:.4f}s"
num_entities, dim = 10000, 512

print(fmt.format("start connecting to Milvus"))
connections.connect("default", host="yyyyyy", port="19530", user="root", password="xxxx")

has = utility.has_collection("hello_milvus_jetis")
print(f"Does collection hello_milvus_jetis exist in Milvus: {has}")

if has:
    print(fmt.format("Drop collection `hello_milvus_jetis`"))
    utility.drop_collection("hello_milvus_jetis")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="name", dtype=DataType.VARCHAR, is_primary=False, auto_id=False, max_length=100, is_partition_key=True),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus_jetis is the simplest demo to introduce the APIs")

print(fmt.format("Create collection `hello_milvus_jetis`"))
hello_milvus = Collection("hello_milvus_jetis", schema, consistency_level="Strong")

rng = np.random.default_rng(seed=19530)

print(fmt.format("Start Creating index IVF_FLAT"))
index = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {"nlist": 128},
}

hello_milvus.create_index("embeddings", index)
from datetime import datetime
print(fmt.format("Start loading"))
hello_milvus.load()

print(fmt.format("Start inserting entities"))

for j in range(0, 500):
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"Insert batch {j} at {current_time}")
    
    entities = [
        [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i + j * num_entities + 10000000)) for i in range(num_entities)],
        [str(i) for i in range(num_entities)],
        rng.random(num_entities).tolist(),
        rng.random((num_entities, dim)).tolist(),
    ]

    hello_milvus.insert(entities)

for j in range(0, 5000) :
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 打印信息,包括当前时间和删除对象的信息
    print(f"Delete {j} at {current_time}")

    id_list = [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i + j * 1000 + + 10000000)) for i in range(1000)]
    quoted_ids = ['"{}"'.format(id) for id in id_list]
    expr = "pk in [" + ", ".join(quoted_ids) + "]"
    hello_milvus.delete(expr)
    # hello_milvus.flush()

    entities = [
        [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i + j * 1000 + + 10000000)) for i in range(1000)],
        [str(i + 100) for i in range(1000)],
        rng.random(1000).tolist(),
        rng.random((1000, dim)).tolist(),
    ]

    hello_milvus.insert(entities)


for j in range(0, 10000) :
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    entities = [
        [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i)) for i in range(1000)],
        [str(i + 100) for i in range(1000)],
        rng.random(1000).tolist(),
        rng.random((1000, dim)).tolist(),
    ]

    hello_milvus.insert(entities)

    # 打印信息,包括当前时间和删除对象的信息
    print(f"Delete {j} at {current_time}")

    id_list = [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i)) for i in range(1000)]
    quoted_ids = ['"{}"'.format(id) for id in id_list]
    expr = "pk in [" + ", ".join(quoted_ids) + "]"
    hello_milvus.delete(expr)

Milvus Log

No response

Anything else?

No response

Metadata

Assignees

Labels

kind/bugIssues or changes related a bugtriage/needs-informationIndicates an issue needs more information in order to work on it.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions