[Bug]: A large number of delete operations have accumulated in Milvus. #38708
Open
Description
Is there an existing issue for this?
- I have searched the existing issues
Environment
- Milvus version: 2.2.10
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka):
- SDK version(e.g. pymilvus v2.0.0rc2): 2.2.12
- OS(Ubuntu or CentOS): debian10
- CPU/Memory:
- GPU:
- Others:
Current Behavior
When I performed a large number of batch delete and write operations, I found that the compaction performance could not keep up. A large number of deltalogs were accumulated. After checking the logs, I found that the operations from half an hour ago were still being applied. When I was still performing a large number of deletion and insertion operations, the instance would quickly become uninsertable.
[dep334-milvus-cxg-400-standalone-858b88f-dcfsm] {"log":"[2024/12/24 14:51:08.792 +08:00] [INFO] [datanode/flush_task.go:134] [\"running flush insert task\"] [\"segment ID\"=454803129838442013] [flushed=false] [dropped=false] [position=\"channel_name:\\\"cxg-400-rootcoord-dml_9_454803129824392717v0\\\" msgID:\\\"1\\\\225\\\\335i\\\\t\\\\311O\\\\006\\\" msgGroup:\\\"cxg-400-dataNode-248-cbg-40088-rootcoord-dml_9_454803129824392717v0\\\" timestamp:454825398530211840 \"] [PosTime=2024/12/24 14:20:04.110 +08:00]\n","stream":"stdout","time":"2024-12-24T06:51:08.792346734Z"}
Why can't we restrict the user's insertion and deletion operations when the compaction performance cannot keep up? Or has the optimization been made in version 2.4?
Expected Behavior
When compaction cannot keep up, restrict users from inserting and deleting data to avoid the cluster becoming unavailable.
Steps To Reproduce
run this script:
import time
import numpy as np
from pymilvus import (
connections,
utility,
FieldSchema, CollectionSchema, DataType,
Collection,
)
fmt = "\n=== {:30} ===\n"
search_latency_fmt = "search latency = {:.4f}s"
num_entities, dim = 10000, 512
print(fmt.format("start connecting to Milvus"))
connections.connect("default", host="yyyyyy", port="19530", user="root", password="xxxx")
has = utility.has_collection("hello_milvus_jetis")
print(f"Does collection hello_milvus_jetis exist in Milvus: {has}")
if has:
print(fmt.format("Drop collection `hello_milvus_jetis`"))
utility.drop_collection("hello_milvus_jetis")
fields = [
FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
FieldSchema(name="name", dtype=DataType.VARCHAR, is_primary=False, auto_id=False, max_length=100, is_partition_key=True),
FieldSchema(name="random", dtype=DataType.DOUBLE),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "hello_milvus_jetis is the simplest demo to introduce the APIs")
print(fmt.format("Create collection `hello_milvus_jetis`"))
hello_milvus = Collection("hello_milvus_jetis", schema, consistency_level="Strong")
rng = np.random.default_rng(seed=19530)
print(fmt.format("Start Creating index IVF_FLAT"))
index = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128},
}
hello_milvus.create_index("embeddings", index)
from datetime import datetime
print(fmt.format("Start loading"))
hello_milvus.load()
print(fmt.format("Start inserting entities"))
for j in range(0, 500):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Insert batch {j} at {current_time}")
entities = [
[str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i + j * num_entities + 10000000)) for i in range(num_entities)],
[str(i) for i in range(num_entities)],
rng.random(num_entities).tolist(),
rng.random((num_entities, dim)).tolist(),
]
hello_milvus.insert(entities)
for j in range(0, 5000) :
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 打印信息,包括当前时间和删除对象的信息
print(f"Delete {j} at {current_time}")
id_list = [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i + j * 1000 + + 10000000)) for i in range(1000)]
quoted_ids = ['"{}"'.format(id) for id in id_list]
expr = "pk in [" + ", ".join(quoted_ids) + "]"
hello_milvus.delete(expr)
# hello_milvus.flush()
entities = [
[str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i + j * 1000 + + 10000000)) for i in range(1000)],
[str(i + 100) for i in range(1000)],
rng.random(1000).tolist(),
rng.random((1000, dim)).tolist(),
]
hello_milvus.insert(entities)
for j in range(0, 10000) :
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entities = [
[str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i)) for i in range(1000)],
[str(i + 100) for i in range(1000)],
rng.random(1000).tolist(),
rng.random((1000, dim)).tolist(),
]
hello_milvus.insert(entities)
# 打印信息,包括当前时间和删除对象的信息
print(f"Delete {j} at {current_time}")
id_list = [str("aaaaaaaaaaaaabbbaaaaaaacccccddddeeeeeeeeeeeeeeffffffff" + str(i)) for i in range(1000)]
quoted_ids = ['"{}"'.format(id) for id in id_list]
expr = "pk in [" + ", ".join(quoted_ids) + "]"
hello_milvus.delete(expr)
Milvus Log
No response
Anything else?
No response