Description
Describe your problem
I want to build a knowledge base from 5,000 books. Since parsing the documents takes a significant amount of time, I plan to deploy RAGFlow on five servers, with each server parsing 1,000 books. Then, I intend to export the knowledge bases from the other four RAGFlow instances and merge them into the last server. However, I discovered that RAGFlow currently does not support exporting and importing knowledge bases. I tried to write scripts for exporting and importing the knowledge base myself, but due to my limited understanding of RAG technology, my tests failed. I’d like to document my attempts here in the hope of receiving guidance from the community.
My document engine uses Infinity. After reviewing the interface code, I found that the RemoteTable
class in Infinity’s Python API provides export_data()
and import_data()
methods. I thought I could use these methods to export data from Infinity and then import the exported data from the four servers into the last server. The knowledge base export script I wrote is as follows:
# export_script.py
import os
import shutil
import json
from rag.utils.infinity_conn import InfinityConnection
from rag import settings
from rag.nlp import search
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import Knowledgebase, Document, File, File2Document
if __name__ == "__main__":
EXPORT_DIR = "export_data"
db_name = settings.INFINITY.get("db_name", "default_db")
# The knowledge base ID to export, using `cf8271bcf83111ef825462a4a8761c31` as an example
kb_id = "cf8271bcf83111ef825462a4a8761c31"
# Initialize the export directory
export_path = f"{EXPORT_DIR}/kb_{kb_id}"
if os.path.exists(export_path):
shutil.rmtree(export_path)
os.makedirs(export_path)
# Get the knowledge base model object
e, kb = KnowledgebaseService.get_by_id(kb_id)
# Export the knowledge base record information to a JSON file
kb_info = kb.to_json()
kb_info["create_date"] = kb_info["create_date"].isoformat()
kb_info["update_date"] = kb_info["update_date"].isoformat()
with open(f"{export_path}/kb.json", "w", encoding="utf-8") as fp:
json.dump(
kb.to_json(),
fp,
indent=4,
ensure_ascii=False,
)
# Export the document information in the knowledge base to a JSON file
res = Document.select().where(
(Document.kb_id==kb.id)
)
docs = [doc for doc in res.dicts()]
doc_ids = []
for doc in docs:
# print(doc)
doc["create_date"] = doc["create_date"].isoformat()
doc["update_date"] = doc["update_date"].isoformat()
doc["process_begin_at"] = doc["process_begin_at"].isoformat()
doc_ids.append(doc["id"])
with open(f"{export_path}/document.json", "w", encoding="utf-8") as fp:
json.dump(
docs,
fp,
indent=4,
ensure_ascii=False,
)
# Export the file-to-document relationship to a JSON file
res = File2Document.select().where(
(File2Document.document_id.in_(doc_ids))
)
file2document = [obj for obj in res.dicts()]
file_ids = []
for obj in file2document:
obj["create_date"] = obj["create_date"].isoformat()
obj["update_date"] = obj["update_date"].isoformat()
file_ids.append(obj["file_id"])
with open(f"{export_path}/file2document.json", "w", encoding="utf-8") as fp:
json.dump(
file2document,
fp,
indent=4,
ensure_ascii=False,
)
# Export the file information to a JSON file
res = File.select().where(
(File.id.in_(file_ids))
)
files = [obj for obj in res.dicts()]
for obj in files:
obj["create_date"] = obj["create_date"].isoformat()
obj["update_date"] = obj["update_date"].isoformat()
with open(f"{export_path}/file.json", "w", encoding="utf-8") as fp:
json.dump(
files,
fp,
indent=4,
ensure_ascii=False,
)
# Export the table corresponding to the knowledge base in the Infinity database to a JSONL file
index_name = search.index_name(kb.tenant_id)
docStoreConn = InfinityConnection()
inf_conn = docStoreConn.connPool.get_conn()
inf_db = inf_conn.get_database(db_name)
table_name = f"{index_name}_{kb_id}"
inf_tb = inf_db.get_table(table_name)
columns = inf_tb.show_columns()
res = inf_tb.export_data(
file_path=f"{export_path}/infinity.jsonl",
export_options={
"file_type": "jsonl",
},
columns=columns["name"].to_list()
)
The script exports the knowledge base, documents, files, and file-to-document relationships from the MySQL database into kb.json
, document.json
, file2docs.json
, and file.json
files. It also exports the corresponding table from the Infinity database into an infinity.jsonl
file.
After copying the export directories from the four servers to the last server, I executed the following import script:
# import_script.py
import os
import shutil
import json
from datetime import datetime
from rag.utils.infinity_conn import InfinityConnection
from rag import settings
from rag.nlp import search
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import Knowledgebase, Document, File, File2Document
if __name__ == "__main__":
IMPORT_DIR = "import_data/kb_cf8271bcf83111ef825462a4a8761c31"
db_name = settings.INFINITY.get("db_name", "default_db")
"""
The map.json file format is as follows:
```json
{
"kb_id": "ce4af17cf8b111ef81666e9432d37477",
"tenant_id": "fa1cb7c0f82c11efbc9c62a4a8761c31"
}
```
* `kb_id` - The ID of the knowledge base to import into
* `tenant_id` - The ID of the owner of the knowledge base to import into
"""
with open(f"{IMPORT_DIR}/map.json", "rb") as fp:
import_map = json.load(fp)
dst_kb_id = import_map["kb_id"]
dst_tenant_id = import_map["tenant_id"]
# Import the kb.json record into the knowledgebase table
with open(f"{IMPORT_DIR}/kb.json", "rb") as fp:
src_kb_info = json.load(fp)
src_kb_id = src_kb_info["id"]
if src_kb_id == dst_kb_id:
kb_id = src_kb_id
kb = Knowledgebase(
id=kb_id,
create_time=src_kb_info["create_time"],
create_date=datetime.fromisoformat(
src_kb_info["create_date"]
),
update_time=src_kb_info["update_time"],
update_date=datetime.fromisoformat(
src_kb_info["update_date"]
),
avatar=src_kb_info["avatar"],
tenant_id=dst_tenant_id,
name=src_kb_info["name"],
language=src_kb_info["language"],
description=src_kb_info["description"],
embd_id=src_kb_info["embd_id"],
permission=src_kb_info["permission"],
created_by=dst_tenant_id,
doc_num=src_kb_info["doc_num"],
token_num=src_kb_info["token_num"],
chunk_num=src_kb_info["chunk_num"],
similarity_threshold=src_kb_info["similarity_threshold"],
vector_similarity_weight=src_kb_info["vector_similarity_weight"],
parser_id=src_kb_info["parser_id"],
parser_config=json.dumps(
src_kb_info["parser_config"],
ensure_ascii=False,
),
pagerank=src_kb_info["pagerank"],
status=src_kb_info["status"],
)
kb.save(force_insert=True)
else:
kb:Knowledgebase = Knowledgebase.get_by_id(dst_kb_id)
kb.doc_num += src_kb_info["doc_num"]
kb.token_num += src_kb_info["token_num"]
kb.chunk_num += src_kb_info["chunk_num"]
kb.save()
# Import the document.json records into the document table
with open(f"{IMPORT_DIR}/document.json","rb") as fp:
docs = json.load(fp)
for doc in docs:
document = Document(
id=doc["id"],
create_time=doc["create_time"],
create_date=datetime.fromisoformat(doc["create_date"]),
update_time=doc["update_time"],
update_date=datetime.fromisoformat(doc["update_date"]),
thumbnail=doc["thumbnail"],
kb_id=kb.id,
parser_id=doc["parser_id"],
parser_config=json.dumps(doc["parser_config"]),
source_type=doc["source_type"],
type=doc["type"],
created_by=dst_tenant_id,
name=doc["name"],
location=doc["location"],
size=doc["size"],
token_num=doc["token_num"],
chunk_num=doc["chunk_num"],
progress=doc["progress"],
progress_msg=doc["progress_msg"],
process_begin_at=datetime.fromisoformat(doc["process_begin_at"]),
process_duation=doc["process_duation"],
meta_fields=json.dumps(doc["meta_fields"]),
run=doc["run"],
status=doc["status"],
)
document.save(force_insert=True)
# Import the file.json records into the file table
with open(f"{IMPORT_DIR}/file.json", "rb") as fp:
files = json.load(fp)
root_file = File.get(
name="/"
)
for info in files:
obj = File(
id=info["id"],
create_time=info["create_time"],
create_date=datetime.fromisoformat(info["create_date"]),
update_time=info["update_time"],
update_date=datetime.fromisoformat(info["update_date"]),
parent_id=root_file.id,
tenant_id=dst_tenant_id,
created_by=dst_tenant_id,
name=info["name"],
location=info["location"],
size=info["size"],
type=info["type"],
source_type=info["source_type"],
)
obj.save(force_insert=True)
# Import the file2docs.json records into the file2document table
with open(f"{IMPORT_DIR}/file2document.json", "rb") as fp:
file2docs = json.load(fp)
for info in file2docs:
obj = File2Document(
id=info["id"],
create_time=info["create_time"],
create_date=datetime.fromisoformat(info["create_date"]),
update_time=info["update_time"],
update_date=datetime.fromisoformat(info["update_date"]),
file_id=info["file_id"],
document_id=info["document_id"],
)
obj.save(force_insert=True)
# Import the infinity.jsonl records into the Infinity database
# Modify the `kb_id` field value in the infinity.json file to the target knowledge base ID to import into
if src_kb_id != kb.id:
temp_path = f"{IMPORT_DIR}/infinity.jsonl.tmp"
target_str = f"\"kb_id\":\"{src_kb_id}\""
replace_str = f"\"kb_id\":\"{kb.id}\""
with open(f"{IMPORT_DIR}/infinity.jsonl", "r", encoding="utf-8") as fin, \
open(temp_path, "w", encoding="utf-8") as fout:
for line in fin:
fout.write(line.replace(target_str, replace_str))
else:
temp_path = f"{IMPORT_DIR}/infinity.jsonl"
index_name = search.index_name(kb.tenant_id)
docStoreConn = InfinityConnection()
inf_conn = docStoreConn.connPool.get_conn()
inf_db = inf_conn.get_database(db_name)
table_name = f"{index_name}_{kb.id}"
inf_tb = inf_db.get_table(table_name)
inf_tb.import_data(
file_path=temp_path,
import_options={
"file_type": "jsonl"
}
)
The script imports the records from kb.json
, document.json
, file.json
, and file2docs.json
into the knowledgebase, document, file, and file2document tables in the MySQL database, respectively. After replacing the kb_id
field value in the infinity.jsonl
file with the target knowledge base ID, it imports the records from infinity.jsonl
into the corresponding table in the Infinity database.
However, after executing my import script, when I access the knowledge base in the RAGFlow web interface, the Infinity container crashes and restarts, and I cannot view the document information or chunk information.
I suspect the reason for the failure is that I don’t understand the business logic of RAGFlow and the Infinity database, and directly exporting and importing data in this way led to data inconsistencies. For my current needs, is there a feasible method or approach? I hope to receive some guidance from the community.