Skip to content

[Question]: Attempt to Export and Import Knowledge Base Failed, Seeking Guidance #5609

Open
@reeingal

Description

@reeingal

Describe your problem

I want to build a knowledge base from 5,000 books. Since parsing the documents takes a significant amount of time, I plan to deploy RAGFlow on five servers, with each server parsing 1,000 books. Then, I intend to export the knowledge bases from the other four RAGFlow instances and merge them into the last server. However, I discovered that RAGFlow currently does not support exporting and importing knowledge bases. I tried to write scripts for exporting and importing the knowledge base myself, but due to my limited understanding of RAG technology, my tests failed. I’d like to document my attempts here in the hope of receiving guidance from the community.

My document engine uses Infinity. After reviewing the interface code, I found that the RemoteTable class in Infinity’s Python API provides export_data() and import_data() methods. I thought I could use these methods to export data from Infinity and then import the exported data from the four servers into the last server. The knowledge base export script I wrote is as follows:

# export_script.py
import os
import shutil
import json
from rag.utils.infinity_conn import InfinityConnection
from rag import settings
from rag.nlp import search
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import Knowledgebase, Document, File, File2Document

if __name__ == "__main__":
    EXPORT_DIR = "export_data"
    db_name = settings.INFINITY.get("db_name", "default_db")

    # The knowledge base ID to export, using `cf8271bcf83111ef825462a4a8761c31` as an example
    kb_id = "cf8271bcf83111ef825462a4a8761c31"
    # Initialize the export directory
    export_path = f"{EXPORT_DIR}/kb_{kb_id}"
    if os.path.exists(export_path):
        shutil.rmtree(export_path)
    os.makedirs(export_path)

    # Get the knowledge base model object
    e, kb = KnowledgebaseService.get_by_id(kb_id)
    # Export the knowledge base record information to a JSON file
    kb_info = kb.to_json()
    kb_info["create_date"] = kb_info["create_date"].isoformat()
    kb_info["update_date"] = kb_info["update_date"].isoformat()
    with open(f"{export_path}/kb.json", "w", encoding="utf-8") as fp:
        json.dump(
            kb.to_json(),
            fp,
            indent=4,
            ensure_ascii=False,
        )

    # Export the document information in the knowledge base to a JSON file
    res = Document.select().where(
        (Document.kb_id==kb.id)
    )
    docs = [doc for doc in res.dicts()]
    doc_ids = []
    for doc in docs:
        # print(doc)
        doc["create_date"] = doc["create_date"].isoformat()
        doc["update_date"] = doc["update_date"].isoformat()
        doc["process_begin_at"] = doc["process_begin_at"].isoformat()
        doc_ids.append(doc["id"])
    with open(f"{export_path}/document.json", "w", encoding="utf-8") as fp:
        json.dump(
            docs,
            fp,
            indent=4,
            ensure_ascii=False,
        )

    # Export the file-to-document relationship to a JSON file
    res = File2Document.select().where(
        (File2Document.document_id.in_(doc_ids))
    )
    file2document = [obj for obj in res.dicts()]
    file_ids = []
    for obj in file2document:
        obj["create_date"] = obj["create_date"].isoformat()
        obj["update_date"] = obj["update_date"].isoformat()
        file_ids.append(obj["file_id"])
    with open(f"{export_path}/file2document.json", "w", encoding="utf-8") as fp:
        json.dump(
            file2document,
            fp,
            indent=4,
            ensure_ascii=False,
        )

    # Export the file information to a JSON file
    res = File.select().where(
        (File.id.in_(file_ids))
    )
    files = [obj for obj in res.dicts()]
    for obj in files:
        obj["create_date"] = obj["create_date"].isoformat()
        obj["update_date"] = obj["update_date"].isoformat()
    with open(f"{export_path}/file.json", "w", encoding="utf-8") as fp:
        json.dump(
            files,
            fp,
            indent=4,
            ensure_ascii=False,
        )

    # Export the table corresponding to the knowledge base in the Infinity database to a JSONL file
    index_name = search.index_name(kb.tenant_id)
    docStoreConn = InfinityConnection()
    inf_conn = docStoreConn.connPool.get_conn()
    inf_db = inf_conn.get_database(db_name)
    table_name = f"{index_name}_{kb_id}"
    inf_tb = inf_db.get_table(table_name)
    columns = inf_tb.show_columns()
    res = inf_tb.export_data(
        file_path=f"{export_path}/infinity.jsonl",
        export_options={
            "file_type": "jsonl",
        },
        columns=columns["name"].to_list()
    )

The script exports the knowledge base, documents, files, and file-to-document relationships from the MySQL database into kb.json, document.json, file2docs.json, and file.json files. It also exports the corresponding table from the Infinity database into an infinity.jsonl file.

After copying the export directories from the four servers to the last server, I executed the following import script:

# import_script.py

import os
import shutil
import json
from datetime import datetime
from rag.utils.infinity_conn import InfinityConnection
from rag import settings
from rag.nlp import search
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import Knowledgebase, Document, File, File2Document

if __name__ == "__main__":
    IMPORT_DIR = "import_data/kb_cf8271bcf83111ef825462a4a8761c31"
    db_name = settings.INFINITY.get("db_name", "default_db")
    """
    The map.json file format is as follows:
    ```json
    {
		"kb_id": "ce4af17cf8b111ef81666e9432d37477",
		"tenant_id": "fa1cb7c0f82c11efbc9c62a4a8761c31"
	}
    ```
    * `kb_id` - The ID of the knowledge base to import into
    * `tenant_id` - The ID of the owner of the knowledge base to import into
	"""
    with open(f"{IMPORT_DIR}/map.json", "rb") as fp:
        import_map = json.load(fp)
    
    dst_kb_id = import_map["kb_id"]
    dst_tenant_id = import_map["tenant_id"]

	# Import the kb.json record into the knowledgebase table
    with open(f"{IMPORT_DIR}/kb.json", "rb") as fp:
        src_kb_info = json.load(fp)
        src_kb_id = src_kb_info["id"]

    if src_kb_id == dst_kb_id:
        kb_id = src_kb_id
        kb = Knowledgebase(
            id=kb_id,
            create_time=src_kb_info["create_time"],
            create_date=datetime.fromisoformat(
                src_kb_info["create_date"]
            ),
            update_time=src_kb_info["update_time"],
            update_date=datetime.fromisoformat(
                src_kb_info["update_date"]
            ),
            avatar=src_kb_info["avatar"],
            tenant_id=dst_tenant_id,
            name=src_kb_info["name"],
            language=src_kb_info["language"],
            description=src_kb_info["description"],
            embd_id=src_kb_info["embd_id"],
            permission=src_kb_info["permission"],
            created_by=dst_tenant_id,
            doc_num=src_kb_info["doc_num"],
            token_num=src_kb_info["token_num"],
            chunk_num=src_kb_info["chunk_num"],
            similarity_threshold=src_kb_info["similarity_threshold"],
            vector_similarity_weight=src_kb_info["vector_similarity_weight"],
            parser_id=src_kb_info["parser_id"],
            parser_config=json.dumps(
                src_kb_info["parser_config"],
                ensure_ascii=False,
            ),
            pagerank=src_kb_info["pagerank"],
            status=src_kb_info["status"],
        )
        kb.save(force_insert=True)
    else:
        kb:Knowledgebase = Knowledgebase.get_by_id(dst_kb_id)
        kb.doc_num += src_kb_info["doc_num"]
        kb.token_num += src_kb_info["token_num"]
        kb.chunk_num += src_kb_info["chunk_num"]
        kb.save()

	# Import the document.json records into the document table
    with open(f"{IMPORT_DIR}/document.json","rb") as fp:
        docs = json.load(fp)
    for doc in docs:
        document = Document(
            id=doc["id"],
            create_time=doc["create_time"],
            create_date=datetime.fromisoformat(doc["create_date"]),
            update_time=doc["update_time"],
            update_date=datetime.fromisoformat(doc["update_date"]),
            thumbnail=doc["thumbnail"],
            kb_id=kb.id,
            parser_id=doc["parser_id"],
            parser_config=json.dumps(doc["parser_config"]),
            source_type=doc["source_type"],
            type=doc["type"],
            created_by=dst_tenant_id,
            name=doc["name"],
            location=doc["location"],
            size=doc["size"],
            token_num=doc["token_num"],
            chunk_num=doc["chunk_num"],
            progress=doc["progress"],
            progress_msg=doc["progress_msg"],
            process_begin_at=datetime.fromisoformat(doc["process_begin_at"]),
            process_duation=doc["process_duation"],
            meta_fields=json.dumps(doc["meta_fields"]),
            run=doc["run"],
            status=doc["status"],

        )
        document.save(force_insert=True)

    # Import the file.json records into the file table
    with open(f"{IMPORT_DIR}/file.json", "rb") as fp:
        files = json.load(fp)

    root_file = File.get(
        name="/"
    )
    for info in files:
        obj = File(
            id=info["id"],
            create_time=info["create_time"],
            create_date=datetime.fromisoformat(info["create_date"]),
            update_time=info["update_time"],
            update_date=datetime.fromisoformat(info["update_date"]),
            parent_id=root_file.id,
            tenant_id=dst_tenant_id,
            created_by=dst_tenant_id,
            name=info["name"],
            location=info["location"],
            size=info["size"],
            type=info["type"],
            source_type=info["source_type"],
        )
        obj.save(force_insert=True)

    # Import the file2docs.json records into the file2document table
    with open(f"{IMPORT_DIR}/file2document.json", "rb") as fp:
        file2docs = json.load(fp)

    for info in file2docs:
        obj = File2Document(
            id=info["id"],
            create_time=info["create_time"],
            create_date=datetime.fromisoformat(info["create_date"]),
            update_time=info["update_time"],
            update_date=datetime.fromisoformat(info["update_date"]),
            file_id=info["file_id"],
            document_id=info["document_id"],
        )
        obj.save(force_insert=True)

    # Import the infinity.jsonl records into the Infinity database
    # Modify the `kb_id` field value in the infinity.json file to the target knowledge base ID to import into
    if src_kb_id != kb.id:
        temp_path = f"{IMPORT_DIR}/infinity.jsonl.tmp"
        target_str = f"\"kb_id\":\"{src_kb_id}\""
        replace_str = f"\"kb_id\":\"{kb.id}\""
        with open(f"{IMPORT_DIR}/infinity.jsonl", "r", encoding="utf-8") as fin, \
            open(temp_path, "w", encoding="utf-8") as fout:
            for line in fin:
                fout.write(line.replace(target_str, replace_str))
    else:
        temp_path = f"{IMPORT_DIR}/infinity.jsonl"

    index_name = search.index_name(kb.tenant_id)
    docStoreConn = InfinityConnection()
    inf_conn = docStoreConn.connPool.get_conn()
    inf_db = inf_conn.get_database(db_name)
    table_name = f"{index_name}_{kb.id}"
    inf_tb = inf_db.get_table(table_name)
    inf_tb.import_data(
        file_path=temp_path,
        import_options={
            "file_type": "jsonl"
        }
    )

The script imports the records from kb.json, document.json, file.json, and file2docs.json into the knowledgebase, document, file, and file2document tables in the MySQL database, respectively. After replacing the kb_id field value in the infinity.jsonl file with the target knowledge base ID, it imports the records from infinity.jsonl into the corresponding table in the Infinity database.

However, after executing my import script, when I access the knowledge base in the RAGFlow web interface, the Infinity container crashes and restarts, and I cannot view the document information or chunk information.

I suspect the reason for the failure is that I don’t understand the business logic of RAGFlow and the Infinity database, and directly exporting and importing data in this way led to data inconsistencies. For my current needs, is there a feasible method or approach? I hope to receive some guidance from the community.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions