Skip to content

kyopark2014/ocean-agent

Repository files navigation

Agentic Workflow๋ฅผ ์ด์šฉํ•œ ๊ธฐ์—… ์ •๋ณด ๋ถ„์„ ์„œ๋น„์Šค ๊ตฌํ˜„ํ•˜๊ธฐ

License

์—ฌ๊ธฐ์—์„œ๋Š” PDF๋กœ ๋œ ๊ธฐ์—… ์ •๋ณด๋ฅผ ์›ํ•˜๋Š” ํ˜•ํƒœ์˜ ๋ณด๊ณ ์„œ๋กœ ๋ถ„์„ํ•˜๋Š” agentic workflow์— ๋Œ€ํ•ด ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค. ์—ฌ๋Ÿฌ ์ข…๋ฅ˜์˜ PDF์—์„œ ํŠน์ • ๋ฌธ์„œ๋ฅผ ํ™œ์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„์„ํ•˜๋ ค๋ฉด 1/ ๋ฌธ์„œ์˜ metadata์— ๊ด€๋ จ ์ •๋ณด๋ฅผ ์ถ”๊ฐ€ํ•ด ํ™œ์šฉํ•˜๊ฑฐ๋‚˜, 2/ contextual embedding์„ ์ด์šฉํ•˜์—ฌ chunk์— ์ถ”๊ฐ€ ์ •๋ณด๋ฅผ ๋„ฃ์–ด์„œ ํ™œ์šฉํ•˜๋Š” ๋ฐฉ์•ˆ์ด ์žˆ์Šต๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์—์„œ๋Š” ๋ฌธ์„œ์—์„œ ๊ธฐ์—…๋ช…๊ณผ ๋ฌธ์„œ ์ƒ์„ฑ์ผ๊ณผ ๊ฐ™์€ ์ •๋ณด๋ฅผ metadata์— ์ถ”์ถœํ•˜๋Š” ๋ฐฉ๋ฒ•๊ณผ prompt๋ฅผ ์ด์šฉํ•ด chunk์˜ ๋‚ด์šฉ์„ ์š”์•ฝํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ชจ๋‘ ํ™œ์šฉํ•˜์—ฌ ๋ฌธ์„œ์—์„œ ํ•„์š”ํ•œ ์ •๋ณด๋ฅผ ์ถฉ๋ถ„ํžˆ ์ถ”์ถœํ•˜๊ณ ์ž ํ•ฉ๋‹ˆ๋‹ค. ๋ณด๊ณ ์„œ์˜ ์ƒ์„ฑ์€ plan and execute ํŒจํ„ด์œผ๋กœ ๋ชฉ์ฐจ์™€ ์ดˆ์•ˆ์„ ์ž‘์„ฑํ•˜๊ณ  reflection์„ ์ด์šฉํ•ด ์ถฉ๋ถ„ํ•œ context๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. Reflection์€ ๊ฐ ๋ฌธ๋‹จ์—์„œ ์ฃผ์š” keyword๋ฅผ ์ถ”์ถฃํ•˜์—ฌ RAG๋ฅผ ํ†ตํ•ด ์–ป์–ด์ง„ ๋ฌธ์„œ๋ฅผ ํ™œ์šฉํ•ฉ๋‹ˆ๋‹ค. ๋ฌธ์„œ์—๋Š” ํ…์ŠคํŠธ๋ฟ ์•„๋‹ˆ๋ผ ์ด๋ฏธ์ง€๋‚˜ ํ‘œ๊ฐ€ ์žˆ์„ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ, multimodal์„ ์ด์šฉํ•ด ๊ทธ๋ฆผ์ด๋‚˜ ํ‘œ๋กœ๋ถ€ํ„ฐ ์ถฉ๋ถ„ํ•œ ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  RAG ๊ฒ€์ƒ‰์˜ ์ •ํ™•๋„๋ฅผ ๋†’์ด๋ฉด์„œ ์ถฉ๋ถ„ํ•œ context๋ฅผ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ณ„์ธต์  chunking์„ ํ™œ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ƒ์„ฑ๋œ ๋ณด๊ณ ์„œ์— ๋Œ€ํ•ด ์‚ฌ์šฉ์ž์˜ ๊ฐ€๋…์„ฑ์„ ๋†’์ด๊ธฐ ์œ„ํ•ด ์—ฌ๊ธฐ์„œ๋Š” markdown format์„ ํ™œ์šฉํ•˜๊ณ , html์— markdown ๋ฌธ์„œ๋ฅผ ํฌํ•จํ•˜์—ฌ ์™ธ๋ถ€๋กœ ๊ณต์œ ํ•  URL์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ด ๋ณด๊ณ ์„œ๋Š” pdf์™€ ๊ฐ™์€ ๋ฐฉ์‹์œผ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ ๋ณ„๋„ ๋ฌธ์„œ๋กœ๋„ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ธฐ์—… ์ •๋ณด ์š”์•ฝ์„ ์œ„ํ•œ ์ธํ”„๋ผ๋Š” Amazon serverless architecture ํŒจํ„ด์„ ๋”ฐ๋ผ ์•„๋ž˜์™€ ๊ฐ™์ด ์„ค๊ณ„๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ architecture๋Š” ๋ณ€ํ™”ํ•˜๋Š” ํŠธ๋ž˜ํ”ฝ์— ์ ์ ˆํžˆ ๋Œ€์‘ํ•˜๊ณ  ๋น„์šฉ์„ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ AWS CDK๋ฅผ ์ด์šฉํ•ด ํŽธ๋ฆฌํ•˜๊ฒŒ ๋ฐฐํฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

OpenSearch๋ฅผ ์ด์šฉํ•œ RAG์˜ ๊ตฌํ˜„

LangChain์˜ OpenSearchVectorSearch์„ ์ด์šฉํ•˜์—ฌ ์ง€์‹์ €์žฅ์†Œ์ธ Amazon OpenSearch์™€ ์—ฐ๊ฒฐํ•ฉ๋‹ˆ๋‹ค. ์ดํ›„ ๊ณ„์ธต์  chunking์„ ์ด์šฉํ•˜์—ฌ ๊ด€๋ จ๋œ ๋ฌธ์„œ๋ฅผ ์กฐํšŒํ•ฉ๋‹ˆ๋‹ค.

def get_answer_using_opensearch(chat, text, connectionId, requestId):    
    global reference_docs
    
    msg = ""
    top_k = 4
    relevant_docs = []
    
    bedrock_embedding = get_embedding()
       
    vectorstore_opensearch = OpenSearchVectorSearch(
        index_name = index_name,
        is_aoss = False,
        ef_search = 1024, # 512(default)
        m=48,
        #engine="faiss",  # default: nmslib
        embedding_function = bedrock_embedding,
        opensearch_url=opensearch_url,
        http_auth=(opensearch_account, opensearch_passwd), # http_auth=awsauth,
    )  
    
    if enalbeParentDocumentRetrival == 'true': # parent/child chunking
        relevant_documents = get_documents_from_opensearch(vectorstore_opensearch, text, top_k)
                        
        for i, document in enumerate(relevant_documents):
            parent_doc_id = document[0].metadata['parent_doc_id']
            doc_level = document[0].metadata['doc_level']
            
            excerpt, name, url = get_parent_content(parent_doc_id) # use pareant document
            
            relevant_docs.append(
                Document(
                    page_content=excerpt,
                    metadata={
                        'name': name,
                        'url': url,
                        'doc_level': doc_level,
                        'from': 'vector'
                    },
                )
            )
    else: 
        relevant_documents = vectorstore_opensearch.similarity_search_with_score(
            query = text,
            k = top_k,
        )
        
        for i, document in enumerate(relevant_documents):
            name = document[0].metadata['name']
            url = document[0].metadata['url']
            content = document[0].page_content
                   
            relevant_docs.append(
                Document(
                    page_content=content,
                    metadata={
                        'name': name,
                        'url': url,
                        'from': 'vector'
                    },
                )
            )

    filtered_docs = grade_documents(text, relevant_docs) # grading
    
    filtered_docs = check_duplication(filtered_docs) # check duplication
            
    relevant_context = ""
    for i, document in enumerate(filtered_docs):
        if document.page_content:
            content = document.page_content
            
        relevant_context = relevant_context + content + "\n\n"
        
    msg = query_using_RAG_context(connectionId, requestId, chat, relevant_context, text)
    
    reference_docs += filtered_docs
           
    return msg

์กฐํšŒํ•œ ๋ฌธ์„œ์˜ ๊ด€๋ จ๋„๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด LLM์„ ์ด์šฉํ•˜์—ฌ grading์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ๋ฌธ์„œ์˜ ๊ด€๋ จ๋„ ํ‰๊ฐ€๋Š” LLM์œผ๋กœ RAG Grading ํ™œ์šฉํ•˜๊ธฐ๋ฅผ ์ฐธ์กฐํ•ฉ๋‹ˆ๋‹ค.

def grade_documents(question, documents):
    print("###### grade_documents ######")
    
    filtered_docs = []
    if multi_region == 'enable':  # parallel processing
        print("start grading...")
        filtered_docs = grade_documents_using_parallel_processing(question, documents)

    else:
        # Score each doc    
        chat = get_chat()
        retrieval_grader = get_retrieval_grader(chat)
        for i, doc in enumerate(documents):            
            score = retrieval_grader.invoke({"question": question, "document": doc.page_content})
            
            grade = score.binary_score

            # Document relevant
            if grade.lower() == "yes":
                print("---GRADE: DOCUMENT RELEVANT---")
                filtered_docs.append(doc)
            # Document not relevant
            else:
                print("---GRADE: DOCUMENT NOT RELEVANT---")
                continue
    
    return filtered_docs

Agentic RAG

Agent๋กœ RAG๊ฐ€ ํฌํ•จ๋œ workflow๋ฅผ ์•„๋ž˜์™€ ๊ฐ™์ด ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค. Tool์—๋Š” ์‹œ๊ฐ„(get_current_time), ๋„์„œ(get_book_list), ๋‚ ์”จ(get_weather_info)์™€ ๊ฐ™์€ ๊ธฐ๋ณธ ๊ธฐ๋Šฅ๋ฟ ์•„๋‹ˆ๋ผ, ์›น๊ฒ€์ƒ‰(search_by_tavily)๊ณผ ๊ธฐ์—…์ •๋ณด ๊ฒ€์ƒ‰(search_by_opensearch)์„ ์œ„ํ•œ ๊ธฐ๋Šฅ์„ ํฌํ•จํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

class State(TypedDict):
    messages: Annotated[list, add_messages]

tools = [get_current_time, get_book_list, get_weather_info, search_by_tavily, search_by_opensearch]
tool_node = ToolNode(tools)

def buildChatAgent():
    workflow = StateGraph(State)

    workflow.add_node("agent", call_model)
    workflow.add_node("action", tool_node)
    workflow.add_edge(START, "agent")
    workflow.add_conditional_edges(
        "agent",
        should_continue,
        {
            "continue": "action",
            "end": END,
        },
    )
    workflow.add_edge("action", "agent")

    return workflow.compile()

call_model ๋…ธ๋“œ์—์„œ๋Š” agent์˜ ์ด๋ฆ„๋กธ ์—ญํ• ์„ ์ง€์ •ํ•˜๊ณ , ์ด์ „ ๋Œ€ํ™”์™€ Tool๋“ฑ์œผ๋กœ ๋ถ€ํ„ฐ ์–ป์–ด์ง„ ์ •๋ณด๋ฅผ ํ™œ์šฉํ•˜์—ฌ ์ ์ ˆํ•œ ๋‹ต๋ณ€์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

def call_model(state: State):
    print("###### call_model ######")
    
    if isKorean(state["messages"][0].content)==True:
        system = (
            "๋‹น์‹ ์˜ ์ด๋ฆ„์€ ์„œ์—ฐ์ด๊ณ , ์งˆ๋ฌธ์— ์นœ๊ทผํ•œ ๋ฐฉ์‹์œผ๋กœ ๋Œ€๋‹ตํ•˜๋„๋ก ์„ค๊ณ„๋œ ๋Œ€ํ™”ํ˜• AI์ž…๋‹ˆ๋‹ค."
            "์ƒํ™ฉ์— ๋งž๋Š” ๊ตฌ์ฒด์ ์ธ ์„ธ๋ถ€ ์ •๋ณด๋ฅผ ์ถฉ๋ถ„ํžˆ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค."
            "๋ชจ๋ฅด๋Š” ์งˆ๋ฌธ์„ ๋ฐ›์œผ๋ฉด ์†”์งํžˆ ๋ชจ๋ฅธ๋‹ค๊ณ  ๋งํ•ฉ๋‹ˆ๋‹ค."
            "์ตœ์ข… ๋‹ต๋ณ€์—๋Š” ์กฐ์‚ฌํ•œ ๋‚ด์šฉ์„ ๋ฐ˜๋“œ์‹œ ํฌํ•จํ•ฉ๋‹ˆ๋‹ค."
        )
    else: 
        system = (            
            "You are a conversational AI designed to answer in a friendly way to a question."
            "If you don't know the answer, just say that you don't know, don't try to make up an answer."
            "You will be acting as a thoughtful advisor."    
        )
        
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    chain = prompt | model
        
    response = chain.invoke(state["messages"])

    return {"messages": [response]}

๋ฌธ์„œ ์ „์ฒ˜๋ฆฌ

Metadata ์ถ”์ถœ

ํŠน์ • ํŽ˜์ด์ง€์˜ ํ‘œ์—๋Š” "Subject company"์™€ "Rating date"๋กœ ํ•ด๋‹น ๋ฌธ์„œ์˜ ๋Œ€์ƒ๊ณผ ์ƒ์„ฑ์ผ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

image

๋ฌธ์„œ๋ฅผ Amazon S3์— ์˜ฌ๋ฆด๋•Œ ๋ฐœ์ƒํ•˜๋Š” put event๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฌธ์„œ๋ฅผ ์ฝ์–ด์˜ฌ๋•Œ ํŠน์ • ํŽ˜์ด์ง€์˜ ์ •๋ณด๋ฅผ ์ด์šฉํ•ด company์™€ date๋ฅผ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

lambda-document-manager / lambda_function.py์˜ ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ์ฐธ์กฐํ•ฉ๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์„œ๋Š” ๋ฌธ์žฅ์—์„œ Structured Output์„ ์ด์šฉํ•˜์—ฌ subject_company, rating_date์„ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค.

def get_profile_of_doc(content: str):
    """Provide profile of document."""
    
    class Profile(BaseModel):
        subject_company: str = Field(description="The value of 'Subject company'")
        rating_date: str = Field(description="The value of 'Rating data'")
    
    subject_company = rating_date = ""
    for attempt in range(5):
        chat = get_chat()
        structured_llm = chat.with_structured_output(Profile, include_raw=True)
    
        info = structured_llm.invoke(content)
            
        if not info['parsed'] == None:
            parsed_info = info['parsed']
            subject_company = parsed_info.subject_company
            rating_date = parsed_info.rating_date                            
            break
    return subject_company, rating_date        

์ด๋ฏธ์ง€์˜ Header / Footer ์ œ๊ฑฐ

lambda-document-manager - lambda_function.py์—์„œ๋Š” pdf_profile์„ ์ฐธ์กฐํ•˜์—ฌ ์ด๋ฏธ์ง€์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ์‹œ์— header์™€ footer๋ฅผ ์ œ๊ฑฐํ•ฉ๋‹ˆ๋‹ค. header์™€ footer์˜ ์œ„์น˜๋Š” pdf์— ๋งž๊ฒŒ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค.

pdf_profile = 'ocean'

def store_image_for_opensearch(key, page, subject_company, rating_date):
    image_obj = s3_client.get_object(Bucket=s3_bucket, Key=key)
                        
    image_content = image_obj['Body'].read()
    img = Image.open(BytesIO(image_content))
                        
    width, height = img.size 
    print(f"(original) width: {width}, height: {height}, size: {width*height}")
    
    pos = key.rfind('/')
    prefix = key[pos+1:pos+5]
    print('img_prefix: ', prefix)    
    if pdf_profile=='ocean' and prefix == "img_":
        area = (0, 175, width, height-175)
        img = img.crop(area)
            
        width, height = img.size 
        print(f"(croped) width: {width}, height: {height}, size: {width*height}")

PDF์—์„œ ์ด๋ฏธ์ง€ ์ถ”์ถœ

Amazon S3์— ์ด๋ฏธ์ง€ ํŒŒ์ผ์ด ์—…๋กœ๋“œ๋˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด PyMuPDF๋ฅผ ์ด์šฉํ•ด ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค. Text๋Š” chunking์„ ์ˆ˜ํ–‰ํ•˜๊ณ  ์ด๋ฏธ์ง€, Table์€ ์ด๋ฏธ์ง€๋กœ ์ €์žฅํ›„ Multimodal์„ ์ด์šฉํ•ด ์ด๋ฏธ์ง€์™€ ํ‘œ์˜ ๋‚ด์šฉ์„ ํ•ด์„ํ•˜๊ณ  ํ…์ŠคํŠธ๋ฅผ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค.

def load_document(file_type, key):
    s3r = boto3.resource("s3")
    doc = s3r.Object(s3_bucket, key)
    
    files = []
    tables = []
    contents = ""
    subject_company = rating_date = ""
    if file_type == 'pdf':
        Byte_contents = doc.get()['Body'].read()

        texts = []
        nImages = []
        try: 
            # pdf reader            
            reader = PdfReader(BytesIO(Byte_contents))
            
            # extract text
            imgList = []
            for i, page in enumerate(reader.pages):
                
                if i==0 and pdf_profile == 'ocean': # profile page
                    print('skip the first page!')
                    continue
                    
                texts.append(page.extract_text())
                
                nImage = 0
                if '/Resources' in page:
                    print(f"Resources[{i}]: {page['/Resources']}")
                    if '/ProcSet' in page['/Resources']:
                        print(f"Resources/ProcSet[{i}]: {page['/Resources']['/ProcSet']}")
                    if '/XObject' in page['/Resources']:
                        print(f"Resources/XObject[{i}]: {page['/Resources']['/XObject']}")                        
                        for j, image in enumerate(page['/Resources']['/XObject']):
                            print(f"image[{j}]: {image}")                                 
                            if image in imgList:
                                print('Duplicated...')
                                continue    
                            else:
                                imgList.append(image)
                                                    
                            Im = page['/Resources']['/XObject'][image]
                            print(f"{image}[{j}]: {Im}")                            
                            nImage = nImage+1
                            
                print(f"# of images of page[{i}] = {nImage}")
                nImages.append(nImage)
                
                # extract metadata
                if pdf_profile == 'ocean' and i==1:
                    print("---> extract metadata from document")
                    pageText = page.extract_text()
                    print('pageText: ', pageText)
                    
                    subject_company, rating_date_ori = get_profile_of_doc(pageText)
                    print('subject_company: ', subject_company)
                    
                    from datetime import datetime
                    d = datetime.strptime(rating_date_ori, '%d %B %Y')
                    rating_date = str(d)[:10] 
                    print('rating_date: ', rating_date)

            contents = '\n'.join(texts)
                        
            pages = fitz.open(stream=Byte_contents, filetype='pdf')     

            # extract table data
            table_count = 0
            for i, page in enumerate(pages):
                page_tables = page.find_tables()
                
                if page_tables.tables:
                    print('page_tables.tables: ', len(page_tables.tables))

                    for tab in page_tables.tables:    
                        if tab.row_count>=2:
                            table_image = extract_table_image(page, i, table_count, tab.bbox, key, subject_company, rating_date)
                            table_count += 1
                        
                            tables.append({
                                "body": tab.to_markdown(),
                                "page": str(i),
                                "name": table_image
                            })                    
                            files.append(table_image)

            # extract page images
            if enablePageImageExraction=='true': 
                for i, page in enumerate(pages):
                    imgInfo = page.get_image_info()
                    width = height = 0
                    for j, info in enumerate(imgInfo):
                        bbox = info['bbox']
                        
                    print(f"nImages[{i}]: {nImages[i]}")  # number of XObjects
                    if nImages[i]>=4 or \
                        (nImages[i]>=1 and (width==0 and height==0)) or \
                        (nImages[i]>=1 and (width>=100 or height>=100)):
                        pixmap = page.get_pixmap(dpi=200)  # dpi=300
                        
                        # convert to png
                        img = Image.frombytes("RGB", [pixmap.width, pixmap.height], pixmap.samples)
                        pixels = BytesIO()
                        img.save(pixels, format='PNG')
                        pixels.seek(0, 0)
                                        
                        # get path from key
                        objectName = (key[key.find(s3_prefix)+len(s3_prefix)+1:len(key)])
                        folder = s3_prefix+'/captures/'+objectName+'/'
                                
                        fname = 'img_'+key.split('/')[-1].split('.')[0]+f"_{i}"
                        print('fname: ', fname)          

                        if pdf_profile == 'ocean':
                            img_meta = {
                                "ext": 'png',
                                "page": str(i),
                                "company": subject_company,
                                "date": rating_date
                            }
                        else: 
                            img_meta = {
                                "ext": 'png',
                                "page": str(i)
                            }
                        print('img_meta: ', img_meta)
                               
                        response = s3_client.put_object(
                            Bucket=s3_bucket,
                            Key=folder+fname+'.png',
                            ContentType='image/png',
                            Metadata = img_meta,
                            Body=pixels
                        )                                                        
                        files.append(folder+fname+'.png')
                                    
                contents = '\n'.join(texts)
                
            elif enableImageExtraction == 'true':
                image_files = extract_images_from_pdf(reader, key)
                for img in image_files:
                    files.append(img)
        
        except Exception:
                err_msg = traceback.format_exc()
                print('err_msg: ', err_msg)                     

pdf์— ์ด๋ฏธ์ง€ ํŒŒ์ผ๋“ค์ด ์žˆ๋‹ค๋ฉด pypdf๋ฅผ ์ด์šฉํ•˜์—ฌ Amazon S3์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

from pypdf import PdfReader   

reader = PdfReader(BytesIO(Byte_contents))
image_files = extract_images_from_pdf(reader, key)
for img in image_files:
    files.append(img)

def extract_images_from_pdf(reader, key):
    picture_count = 1
    
    extracted_image_files = []
    print('pages: ', len(reader.pages))
    for i, page in enumerate(reader.pages):        
        for image_file_object in page.images:
            img_name = image_file_object.name            
            if img_name in extracted_image_files:
                print('skip....')
                continue
            
            extracted_image_files.append(img_name)
            
            ext = img_name.split('.')[-1]            
            contentType = ""
            if ext == 'png':
                contentType = 'image/png'
            elif ext == 'jpg' or ext == 'jpeg':
                contentType = 'image/jpeg'
            elif ext == 'gif':
                contentType = 'image/gif'
            elif ext == 'bmp':
                contentType = 'image/bmp'
            elif ext == 'tiff' or ext == 'tif':
                contentType = 'image/tiff'
            elif ext == 'svg':
                contentType = 'image/svg+xml'
            elif ext == 'webp':
                contentType = 'image/webp'
            elif ext == 'ico':
                contentType = 'image/x-icon'
            elif ext == 'eps':
                contentType = 'image/eps'
            
            if contentType:                
                image_bytes = image_file_object.data

                pixels = BytesIO(image_bytes)
                pixels.seek(0, 0)
                            
                # get path from key
                objectName = (key[key.find(s3_prefix)+len(s3_prefix)+1:len(key)])
                folder = s3_prefix+'/files/'+objectName+'/'
                            
                img_key = folder+img_name
                
                response = s3_client.put_object(
                    Bucket=s3_bucket,
                    Key=img_key,
                    ContentType=contentType,
                    Body=pixels
                )                            
                picture_count += 1
                    
                extracted_image_files.append(img_key)

    return extracted_image_files

์ฝ์–ด์˜จ ๋ฌธ์„œ์—์„œ ์ถ”์ถœ๋œ ํ…์ŠคํŠธ์™€ ํ…Œ์ด๋ธ”์€ Document ํƒ€์ž…์œผ๋กœ ๋ชจ์œผ๊ณ  ๋ฒกํ„ฐ ์ €์žฅ์†Œ(vectorstore)์ธ OpenSearch์— ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

def store_document_for_opensearch(file_type, key):
    contents, files, tables, subject_company, rating_date = load_document(file_type, key)
    
    if len(contents) == 0:
        print('no contents: ', key)
        return [], files
    
    print('length: ', len(contents))
    
    docs = []
    
    # text        
    docs.append(Document(
        page_content=contents,
        metadata={
            'name': key,
            'url': path+parse.quote(key),
            'subject_company': subject_company,
            'rating_date': rating_date
        }
    ))
        
    # table
    for table in tables:
        docs.append(Document(
            page_content=table['body'],
            metadata={
                'name': table['name'],
                'url': path+parse.quote(table['name']),
                'page': table['page'],
                'subject_company': subject_company,
                'rating_date': rating_date
            }
        ))  

    ids = add_to_opensearch(docs, key)
    
    return ids, files

๋ฌธ์„œ๋ฅผ OpenSearch์— ๋„ฃ์„๋•Œ์—๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด chunking์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

def add_to_opensearch(docs, key):    
    if len(docs) == 0:
        return []    
    
    objectName = (key[key.find(s3_prefix)+len(s3_prefix)+1:len(key)])
    print('objectName: ', objectName)    
    metadata_key = meta_prefix+objectName+'.metadata.json'
    print('meta file name: ', metadata_key)    
    delete_document_if_exist(metadata_key)
        
    ids = []
    if enalbeParentDocumentRetrival == 'true':
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,
            chunk_overlap=100,
            separators=["\n\n", "\n", ".", " ", ""],
            length_function = len,
        )
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,
            chunk_overlap=50,
            # separators=["\n\n", "\n", ".", " ", ""],
            length_function = len,
        )

        parent_docs = parent_splitter.split_documents(docs)
        print('len(parent_docs): ', len(parent_docs))
        
        print('parent chunk[0]: ', parent_docs[0].page_content)
        parent_docs = get_contexual_docs(docs[-1], parent_docs)
        print('parent contextual chunk[0]: ', parent_docs[0].page_content)
                
        if len(parent_docs):
            for i, doc in enumerate(parent_docs):
                doc.metadata["doc_level"] = "parent"
                    
            try:        
                parent_doc_ids = vectorstore.add_documents(parent_docs, bulk_size = 10000)
                print('parent_doc_ids: ', parent_doc_ids) 
                print('len(parent_doc_ids): ', len(parent_doc_ids))
                
                child_docs = []
                       
                for i, doc in enumerate(parent_docs):
                    _id = parent_doc_ids[i]
                    sub_docs = child_splitter.split_documents([doc])
                    for _doc in sub_docs:
                        _doc.metadata["parent_doc_id"] = _id
                        _doc.metadata["doc_level"] = "child"
                        
                    child_docs.extend(sub_docs)
                
                print('child chunk[0]: ', child_docs[0].page_content)
                child_docs = get_contexual_docs(docs[-1], child_docs)
                print('child contextual chunk[0]: ', child_docs[0].page_content)
                
                child_doc_ids = vectorstore.add_documents(child_docs, bulk_size = 10000)
                print('child_doc_ids: ', child_doc_ids) 
                print('len(child_doc_ids): ', len(child_doc_ids))
                    
                ids = parent_doc_ids+child_doc_ids
            except Exception:
                err_msg = traceback.format_exc()
                print('error message: ', err_msg)                
    else:
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=100,
            separators=["\n\n", "\n", ".", " ", ""],
            length_function = len,
        ) 
        
        documents = text_splitter.split_documents(docs)
        print('len(documents): ', len(documents))
            
        if len(documents):            
            if enableContexualRetrieval == 'true':                        
                print('chunk[0]: ', documents[0].page_content)             
                documents = get_contexual_docs(docs[-1], documents)
                print('contextual chunk[0]: ', documents[0].page_content)  
            else:
                print('documents[0]: ', documents[0])
            
        try:        
            ids = vectorstore.add_documents(documents, bulk_size = 10000)
            print('response of adding documents: ', ids)
        except Exception:
            err_msg = traceback.format_exc()
            print('error message: ', err_msg)
            #raise Exception ("Not able to add docs in opensearch")    
    
    print('len(ids): ', len(ids))
    return ids

Contextual Embedding

Contextual Retrieval์™€ ๊ฐ™์ด contextual embedding์„ ์ด์šฉํ•˜์—ฌ chunk์— ๋Œ€ํ•œ ์„ค๋ช…์„ ์ถ”๊ฐ€ํ•˜๋ฉด, ๊ฒ€์ƒ‰์˜ ์ •ํ™•๋„๋ฅผ ๋†’์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋˜ํ•œ BM25(keyword) ๊ฒ€์ƒ‰์€ OpenSearch์˜ hybrid ๊ฒ€์ƒ‰์„ ํ†ตํ•ด ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ƒ์„ธํ•œ ์ฝ”๋“œ๋Š” lambda_function.py๋ฅผ ์ฐธ์กฐํ•ฉ๋‹ˆ๋‹ค.

def get_contexual_docs(whole_doc, splitted_docs):
    contextual_template = (
        "<document>"
        "{WHOLE_DOCUMENT}"
        "</document>"
        "Here is the chunk we want to situate within the whole document."
        "<chunk>"
        "{CHUNK_CONTENT}"
        "</chunk>"
        "Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk."
        "Answer only with the succinct context and nothing else."
        "Put it in <result> tags."
    )          
    
    contextual_prompt = ChatPromptTemplate([
        ('human', contextual_template)
    ])

    docs = []
    for i, doc in enumerate(splitted_docs):        
        chat = get_contexual_retrieval_chat()
        
        contexual_chain = contextual_prompt | chat
            
        response = contexual_chain.invoke(
            {
                "WHOLE_DOCUMENT": whole_doc.page_content,
                "CHUNK_CONTENT": doc.page_content
            }
        )
        output = response.content
        contextualized_chunk = output[output.find('<result>')+8:len(output)-9]
        
        docs.append(
            Document(
                page_content=contextualized_chunk+"\n\n"+doc.page_content,
                metadata=doc.metadata
            )
        )
    return docs

Case 1: ๊ธฐ์—…์˜ ์ง€๋ถ„์œจ

์•„๋ž˜์˜ ๊ฒฝ์šฐ๋Š” ๊ธฐ์—…์˜ ์ง€๋ถ„์œจ์— ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๋กœ ์•„๋ž˜ chunk์—๋Š” ๋‹จ์ˆœํžˆ ์ง€๋ถ„์œจ ์—ด๊ฑฐํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

structure as of 3 January 2024 (date of last disclosure) is as follows:
Suzano Holding SA, Brazil - 27.76%  
David Feffer - 4.04%  
Daniel Feffer - 3.63%  
Jorge Feffer - 3.60%  
Ruben Feffer - 3.54%  
Alden Fundo De Investimento Em Aรงรตes, Brazil - 1.98%  
Other investors hold the remaining 55.45%
Suzano Holding SA is majority-owned by the founding Feffer family
Ultimate Beneficial Owners
and/or Persons with Significant
ControlFilings show that the beneficial owners/persons with significant control
are members of the Feffer family, namely David Feffer, Daniel Feffer,
Jorge Feffer, and Ruben Feffer
Directors Executive Directors:  
Walter Schalka - Chief Executive Officer  
Aires Galhardo - Executive Officer - Pulp Operation  
Carlos Anรญbal de Almeida Jr - Executive Officer - Forestry, Logistics and
Procurement  
Christian Orglmeister - Executive Officer - New Businesses, Strategy, IT,
Digital and Communication

์•„๋ž˜๋Š” contexualized chunk์ž…๋‹ˆ๋‹ค. ์›๋ณธ chunk์— ์—†๋Š” ํšŒ์‚ฌ๋ช…๊ณผ ownership์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ํฌํ•จํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

This chunk provides details on the ownership structure and key executives of Suzano SA, 
the company that is the subject of the overall document.
It is likely included to provide background information on the company's corporate structure and leadership.

Case 2: ๊ธฐ๋ณธ์˜ finanacial information

์•„๋ž˜๋Š” ์–ด๋–ค ๊ธฐ์—…์˜ financial ์ •๋ณด์— ๋Œ€ํ•œ chunk ์ž…๋‹ˆ๋‹ค.

Type of Compilation Consolidated Consolidated Consolidated
Currency / UnitsBRL โ€˜000 (USD 1 =
BRL 5.04)BRL โ€˜000 (USD 1 =
BRL 5.29)BRL โ€˜000 (USD 1 =
BRL 5.64)
Turnover 29,384,030 49,830,946 40,965,431
Gross results 11,082,919 25,009,658 20,349,843
Depreciation (5,294,748) (7,206,125) (6,879,132)
Operating profit (loss) 9,058,460 22,222,781 18,180,191
Interest income 1,215,644 967,010 272,556
Interest expense (3,483,674) (4,590,370) (4,221,301)
Other income (expense) 3,511,470 6,432,800 (9,347,234)
Profit (loss) before tax 12,569,930 8,832,957 (17,642,129)
Tax (2,978,271) (197,425) (6,928,009)
Net profit (loss) 9,591,659 23,394,887 8,635,532
Net profit (loss) attributable to
minorities/non-controlling
interests14,154 13,270 9,146
Net profit (loss) attributable to the
company9,575,938 23,119,235 8,751,864
Long-term assets 103,391,275 96,075,318 84,872,211
Fixed assets 57,718,542 50,656,634 38,169,703
Goodwill and other intangibles 14,877,234 15,192,971 16,034,339

์•„๋ž˜๋Š” contexualized chunk์ž…๋‹ˆ๋‹ค. chunk์— ์—†๋Š” ํšŒ์‚ฌ๋ช…์„ ํฌํ•จํ•œ ์ •๋ณด๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

This chunk provides detailed financial information about Suzano SA, 
including its turnover, gross results, operating profit, net profit, and asset details. 
It is part of the overall assessment and rating of Suzano SA presented in the document.

Case 3: ์ „ํ™”๋ฒˆํ˜ธ

์•„๋ž˜๋Š” ํšŒ์‚ฌ ์—ฐ๋ฝ์ฒ˜์— ๋Œ€ํ•œ chunk์ž…๋‹ˆ๋‹ค.

|Telephone|+55 11 3503&amp;#45;9000|
|Email|ri@suzano.com.br|
|Company Details||
|Company Type|Publicly Listed|
|Company Status|Operating|
|Sector|Industrial|
|Place of Incorporation|Brazil|
|Region of Incorporation|Bahia|
|Date of Incorporation|17 December 1987|
|Company Registered Number|CNPJ (Tax Id. No.): 16.404.287/0001&amp;#45;55|

์ด๋•Œ์˜ contexualized chunk์˜ ๊ฒฐ๊ณผ๋Š” ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค. chunk์— ์—†๋Š” ํšŒ์‚ฌ์˜ ์—ฐ๋ฝ์ฒ˜์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

This chunk provides detailed company information about Suzano SA,
including its contact details, company type, status, sector, place and date of incorporation, and registered number.
This information is part of the overall assessment and rating of Suzano SA presented in the document.

์ด๋ฏธ์ง€์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ

์ด๋ฏธ์ง€๋Š” LLM์—์„œ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก resizeํ›„์— ํ…์ŠคํŠธ๋ฅผ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค. ์ด๋•Œ LLM์ด ๋ฌธ์„œ์˜ ๋‚ด์šฉ์„ ์ถ”์ถœํ•  ์ˆ˜ ์žˆ๋„๋ก ํšŒ์‚ฌ๋ช…๋“ฑ์„ ์ด์šฉํ•ด ์ •๋ณด๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

def store_image_for_opensearch(key, page, subject_company, rating_date):
    image_obj = s3_client.get_object(Bucket=s3_bucket, Key=key)
                        
    image_content = image_obj['Body'].read()
    img = Image.open(BytesIO(image_content))
                        
    width, height = img.size     
    pos = key.rfind('/')
    prefix = key[pos+1:pos+5]
    print('img_prefix: ', prefix)    
    if pdf_profile=='ocean' and prefix == "img_":
        area = (0, 175, width, height-175)
        img = img.crop(area)
            
        width, height = img.size 
        print(f"(croped) width: {width}, height: {height}, size: {width*height}")
                
    if width < 100 or height < 100:  # skip small size image
        return []
                
    isResized = False
    while(width*height > 5242880):
        width = int(width/2)
        height = int(height/2)
        isResized = True
        print(f"(resized) width: {width}, height: {height}, size: {width*height}")
           
    try:             
        if isResized:
            img = img.resize((width, height))
                             
        buffer = BytesIO()
        img.save(buffer, format="PNG")
        img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
                                                                        
        # extract text from the image
        chat = get_multimodal()
        text = extract_text(chat, img_base64, subject_company)
        extracted_text = text[text.find('<result>')+8:len(text)-9] # remove <result> tag
        
        summary = summary_image(chat, img_base64, subject_company)
        image_summary = summary[summary.find('<result>')+8:len(summary)-9] # remove <result> tag
        
        if len(extracted_text) > 30:
            contents = f"[์ด๋ฏธ์ง€ ์š”์•ฝ]\n{image_summary}\n\n[์ถ”์ถœ๋œ ํ…์ŠคํŠธ]\n{extracted_text}"
        else:
            contents = f"[์ด๋ฏธ์ง€ ์š”์•ฝ]\n{image_summary}"
        print('image contents: ', contents)

        docs = []        
        if len(contents) > 30:
            docs.append(
                Document(
                    page_content=contents,
                    metadata={
                        'name': key,
                        'url': path+parse.quote(key),
                        'page': page,
                        'subject_company': subject_company,
                        'rating_date': rating_date
                    }
                )
            )         
        print('docs size: ', len(docs))
        
        return add_to_opensearch(docs, key)
    
    except Exception:
        err_msg = traceback.format_exc()
        print('error message: ', err_msg)                
        
        return []

๋ฌธ์„œ ์ƒ์„ฑ

๋ฌธ์„œ์˜ ๋ชฉ์ฐจ์™€ ์ด์— ๋”ฐ๋ฅธ ์ž‘์„ฑ๊ณผ์ •์€ plan and execute ํŒจํ„ด๊ณผ reflection์„ ํ™œ์šฉํ•ฉ๋‹ˆ๋‹ค.

Plan and execute ํŒจํ„ด์€ ์ด์ „ ์ž‘์„ฑ๋œ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•  ์ˆ˜ ์žˆ์–ด์„œ ๋ฌธ์žฅ์˜ ์ค‘๋ณต ๋ฐ ์ž์—ฐ์Šค๋Ÿฌ์šด ์—ฐ๊ฒฐ์„ ์œ„ํ•ด ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค. ๋ฌธ์„œ์˜ ๊ฒ€์ƒ‰๊ณผ ์ƒ์„ฑ์€ workflow๋ฅผ ์ด์šฉํ•ด ๊ตฌ์„ฑํ•ฉ๋‹ˆ๋‹ค.

def buildPlanAndExecuteOceanWorkflow():
    workflow = StateGraph(State)

    # Add nodes
    workflow.add_node("plan", plan_node)
    workflow.add_node("retrieve", retrieve_node)        
    workflow.add_node("generate", generate_node)
    workflow.add_node("revise_answers", revise_answers)  # reflection

    # Add edges
    workflow.add_edge(START, "plan")
    workflow.add_edge("plan", "retrieve")
    workflow.add_edge("retrieve", "generate")
    workflow.add_edge("generate", "revise_answers")
    workflow.add_edge("revise_answers", END)
    
    return workflow.compile()

์ด๋•Œ reflection ํŒจํ„ด์˜ workflow๋Š” ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.

def buildReflection():
    workflow = StateGraph(ReflectionState)

    # Add nodes
    workflow.add_node("reflect_node", reflect_node)
    workflow.add_node("revise_draft", revise_draft)

    # Set entry point
    workflow.set_entry_point("reflect_node")        
    workflow.add_conditional_edges(
        "revise_draft", 
        should_continue, 
        {
            "end": END, 
            "continue": "reflect_node"}
    )

    # Add edges
    workflow.add_edge("reflect_node", "revise_draft")
        
    return workflow.compile()

๋ณด๊ณ ์„œ plan์€ ์•„๋ž˜์™€ ๊ฐ™์ด ๋ฏธ๋ฆฌ ์ •์˜ํ•œ ๋ชฉ์ฐจ์™€ keyword๋ฅผ ์ด์šฉํ•ฉ๋‹ˆ๋‹ค.

def plan_node(state: State):
    print('###### plan_node ######')
    subject_company = state["subject_company"]    
    
    planning_steps = [
        "1. ํšŒ์‚ฌ ์†Œ๊ฐœ",
        "2. ์ฃผ์š” ์˜์—… ํ™œ๋™",
        "3. ์žฌ๋ฌด ํ˜„ํ™ฉ",
        "4. ์„ ๋Œ€ ํ˜„ํ™ฉ",
        "5. ์ข…ํ•ฉ ํ‰๊ฐ€"
    ]
    
    sub_queries = [
        [
            "establish", 
            "location", 
            "management", 
            "affiliated"
        ],
        [
            "cargo", 
            "route", 
            "owned/chartered", 
            "strategy"
        ],
        [
            "financial performance", 
            "route", 
            "financial risk",
            "payment"
        ],
        [
            "fleet"
        ],
        [
            "rating",
            "assessment" 
        ]        
    ]
        
    return {
        "subject_company": subject_company,
        "planning_steps": planning_steps,
        "sub_queries": sub_queries
    }

๊ฒ€์ƒ‰์€ sub_query๋ฅผ ์ด์šฉํ•ด retrieve ๋…ธ๋“œ๊ฐ€ RAG๋ฅผ ์กฐํšŒํ•˜๋Š” ๋ฐฉ๋ฒ•์œผ๋กœ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์†๋„ ํ–ฅ์ƒ์„ ์œ„ํ•˜์—ฌ multi region์„ ์ด์šฉํ•œ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๋ฅผ ์ ์šฉํ•˜์˜€์Šต๋‹ˆ๋‹ค.

def retrieve_node(state: State):
    print('###### retrieve_node ######')
    subject_company = state["subject_company"]    
    planning_steps = state["planning_steps"]
    print(f"subject_company: {subject_company}, planning_steps: {planning_steps}")
    
    relevant_contexts = []        
    references = []
    sub_queries = state["sub_queries"]
    
    for i, step in enumerate(planning_steps):
        print(f"{i}: {step}")
        
        contents = ""    
        if multi_region == 'enable': 
            relevant_docs = retrieve_for_parallel_processing(sub_queries[i], subject_company)
            for doc in relevant_docs:
                contents += doc.page_content
            
            references += relevant_docs
        else:
                
            for q in sub_queries[i]:
                docs = retrieve(q, subject_company)
                
                print(f"---> q: {sub_queries[i]}, docs: {docs}")                
                for doc in docs:            
                    contents += doc.page_content
                
                references += docs
                                
        relevant_contexts.append(contents)
        
    return {
        "subject_company": subject_company,
        "planning_steps": planning_steps,
        "relevant_contexts": relevant_contexts,
        "references": references
    }

์ด๋•Œ RAG ๊ฒ€์ƒ‰์€ ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค. ๊ณ„์ธต์ /๊ณ ์ • chunking์„ ๋ชจ๋‘ ๊ตฌํ˜„ํ•˜์˜€์ง€๋งŒ, ๊ฒ€์ƒ‰ ์„ฑ๋Šฅ ํ–ฅ์ƒ์„ ์œ„ํ•ด ๊ณ„์ธต์  chunking ๋ฐฉ์‹์„ ํ™œ์šฉํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ ๊ฒ€์ƒ‰ํ•  ๋ฌธ์„œ๋Š” ๊ฒ€์ƒ‰ ์šฉ์–ด์™€ ๊ฐ€์žฅ ๊ฐ€๊นŒ์šด(matching) ๋ฌธ์„œ๋ฅผ ์ฐพ์•„์„œ ์ด์šฉํ•ฉ๋‹ˆ๋‹ค.

def retrieve(query: str, subject_company: str):
    print(f'###### retrieve: {query} ######')
    global reference_docs
    
    top_k = 4
    docs = []
    
    bedrock_embedding = get_embedding()
       
    vectorstore_opensearch = OpenSearchVectorSearch(
        index_name = index_name,
        is_aoss = False,
        ef_search = 1024, # 512(default)
        m=48,
        #engine="faiss",  # default: nmslib
        embedding_function = bedrock_embedding,
        opensearch_url=opensearch_url,
        http_auth=(opensearch_account, opensearch_passwd), # http_auth=awsauth,
    )  
    
    if enalbeParentDocumentRetrival == 'true': # parent/child chunking
        relevant_documents = get_documents_from_opensearch_for_subject_company(vectorstore_opensearch, query, top_k, subject_company)
                        
        for i, document in enumerate(relevant_documents):
            parent_doc_id = document[0].metadata['parent_doc_id']
            doc_level = document[0].metadata['doc_level']

            excerpt, name, url = get_parent_content(parent_doc_id) # use pareant document
            
            docs.append(
                Document(
                    page_content=excerpt,
                    metadata={
                        'name': name,
                        'url': url,
                        'doc_level': doc_level,
                        'from': 'vector'
                    },
                )
            )
    else:
        boolean_filter = {
            "bool": {
                "filter":[
                    {"match" : {"metadata.subject_company":subject_company}},
                    {"term" : {"metadata.doc_level":"child"}}
                ]
            }
        }
        relevant_documents = vectorstore_opensearch.similarity_search_with_score(
            query = query,
            k = top_k,  
            search_type="script_scoring",
            pre_filter=boolean_filter
        )
    
        for i, document in enumerate(relevant_documents):
            name = document[0].metadata['name']
            url = document[0].metadata['url']
            content = document[0].page_content
                   
            docs.append(
                Document(
                    page_content=content,
                    metadata={
                        'name': name,
                        'url': url,
                        'from': 'vector'
                    },
                )
            )
    
    filtered_docs = grade_documents(query, docs) # grading
    
    filtered_docs = check_duplication(filtered_docs) # check duplication
            
    reference_docs += filtered_docs # add to reference
    
    return filtered_docs

๋ฌธ์„œ์˜ ์ƒ์„ฑ์€ ์•„๋ž˜์™€ ๊ฐ™์ด ์ฃผ์ œ์— ๋Œ€ํ•œ ๋ณด๊ณ ์„œ ์ง€์‹œ์‚ฌํ•ญ, ๋ณด๊ณ ์„œ ๋‹จ๊ณ„, ์ด๋ฏธ ์ž‘์„ฑํ•œ ํ…์ŠคํŠธ, ์ฐธ๊ณ  ๋ฌธ์„œ๋ฅผ ํ™œ์šฉํ•ด ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

def generate_node(state: State):    
    print('###### generate_node ######')
    write_template = (
        "๋‹น์‹ ์€ ๊ธฐ์—…์— ๋Œ€ํ•œ ๋ณด๊ณ ์„œ๋ฅผ ์ž‘์„ฑํ•˜๋Š” ํ›Œ๋ฅญํ•œ ๊ธ€์“ฐ๊ธฐ ๋„์šฐ๋ฏธ์ž…๋‹ˆ๋‹ค."
        "์•„๋ž˜์™€ ๊ฐ™์ด ์›๋ณธ ๋ณด๊ณ ์„œ ์ง€์‹œ์‚ฌํ•ญ๊ณผ ๊ณ„ํšํ•œ ๋ณด๊ณ ์„œ ๋‹จ๊ณ„๋ฅผ ์ œ๊ณตํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค."
        "๋˜ํ•œ ์ œ๊ฐ€ ์ด๋ฏธ ์ž‘์„ฑํ•œ ํ…์ŠคํŠธ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค."
        
        "๋ณด๊ณ ์„œ ์ง€์‹œ์‚ฌํ•ญ:"
        "<instruction>"
        "{instruction}"
        "</instruction>"
        
        "๋ณด๊ณ ์„œ ๋‹จ๊ณ„:"
        "<plan>"
        "{plan}"
        "</plan>"
        
        "์ด๋ฏธ ์ž‘์„ฑํ•œ ํ…์ŠคํŠธ:"
        "<text>"
        "{text}"
        "</text>"
        
        "์ฐธ๊ณ  ๋ฌธ์„œ"
        "<context>"        
        "{context}"
        "</context>"        
        
        "๋ณด๊ณ ์„œ ์ง€์‹œ ์‚ฌํ•ญ, ๋ณด๊ณ ์„œ ๋‹จ๊ณ„, ์ด๋ฏธ ์ž‘์„ฑ๋œ ํ…์ŠคํŠธ, ์ฐธ๊ณ  ๋ฌธ์„œ๋ฅผ ์ฐธ์กฐํ•˜์—ฌ ๋‹ค์Œ ๋‹จ๊ณ„์„ ๊ณ„์† ์ž‘์„ฑํ•ฉ๋‹ˆ๋‹ค."
        "๊ธฐ์—…์— ๋Œ€ํ•œ ๊ตฌ์ฒด์ ์ธ ์ •๋ณด๋Š” ๋ฐ›๋“œ์‹œ ์ฐธ๊ณ  ๋ฌธ์„œ๋ฅผ ์ด์šฉํ•ด ์ž‘์„ฑํ•˜๊ณ , ๋ชจ๋ฅด๋Š” ๋ถ€๋ถ„์€ ํฌํ•จํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค."
        
        "๋‹ค์Œ ๋‹จ๊ณ„:"
        "<step>"
        "{STEP}"
        "</step>"
                
        "๋ณด๊ณ ์„œ์˜ ๋‚ด์šฉ์ด ๋Š์–ด์ง€์ง€ ์•Š๊ณ  ์ž˜ ์ดํ•ด๋˜๋„๋ก ํ•˜๋‚˜์˜ ๋ฌธ๋‹จ์„ ์ถฉ๋ถ„ํžˆ ๊ธธ๊ฒŒ ์ž‘์„ฑํ•ฉ๋‹ˆ๋‹ค."
        "ํ•„์š”ํ•˜๋‹ค๋ฉด ์•ž์— ์ž‘์€ ๋ถ€์ œ๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค."
        "์ด๋ฏธ ์ž‘์„ฑ๋œ ํ…์ŠคํŠธ๋ฅผ ๋ฐ˜๋ณตํ•˜์ง€ ๋ง๊ณ  ์ž‘์„ฑํ•œ ๋ฌธ๋‹จ๋งŒ ์ถœ๋ ฅํ•˜์„ธ์š”."                
        "Markdown ํฌ๋งท์œผ๋กœ ์„œ์‹์„ ์ž‘์„ฑํ•˜์„ธ์š”."
        "์ตœ์ข… ๊ฒฐ๊ณผ์— <result> tag๋ฅผ ๋ถ™์—ฌ์ฃผ์„ธ์š”."
    )
    
    write_prompt = ChatPromptTemplate.from_messages([
        ("human", write_template)
    ])
    
    instruction = f"{state['subject_company']} ํšŒ์‚ฌ์— ๋Œ€ํ•ด ์†Œ๊ฐœํ•ด ์ฃผ์„ธ์š”."
    planning_steps = state["planning_steps"]
    text = ""
    drafts = []
    
    for i, step in enumerate(planning_steps):
        context = state["relevant_contexts"][i]
        
        chat = get_chat()                       
        write_chain = write_prompt | chat            
        try: 
            result = write_chain.invoke({
                "instruction": instruction,
                "plan": planning_steps,
                "text": text,
                "context": context,
                "STEP": step
            })

            output = result.content
            draft = output[output.find('<result>')+8:len(output)-9] # remove <result> tag    
                
            if draft.find('#')!=-1 and draft.find('#')!=0:
                draft = draft[draft.find('#'):]
                    
            text += draft + '\n\n'
            drafts.append(draft)
                
        except Exception:
            err_msg = traceback.format_exc()
            print('error message: ', err_msg)                        
            raise Exception ("Not able to request to LLM")

    return {
        "drafts": drafts
    }

์ง์ ‘ ์‹ค์Šต ํ•ด๋ณด๊ธฐ

์‚ฌ์ „ ์ค€๋น„ ์‚ฌํ•ญ

์ด ์†”๋ฃจ์…˜์„ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ์‚ฌ์ „์— ์•„๋ž˜์™€ ๊ฐ™์€ ์ค€๋น„๊ฐ€ ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

CDK๋ฅผ ์ด์šฉํ•œ ์ธํ”„๋ผ ์„ค์น˜

๋ณธ ์‹ค์Šต์—์„œ๋Š” us-west-2 ๋ฆฌ์ „์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ธํ”„๋ผ ์„ค์น˜์— ๋”ฐ๋ผ CDK๋กœ ์ธํ”„๋ผ ์„ค์น˜๋ฅผ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

์‹คํ–‰๊ฒฐ๊ณผ

Agent RAG ๊ธฐ๋ณธ ๋™์ž‘

์ฑ„ํŒ… ๋ฉ”๋‰ด์—์„œ "RAG"๋ฅผ ์„ ํƒํ•˜๊ณ  "Suzano์— ๋Œ€ํ•œ ์—ฐ๋ฝ์ฒ˜ ์ •๋ณด๋Š”?"๋กœ ์ž…๋ ฅ ํ›„์— ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

noname

์ฑ„ํŒ… ๋ฉ”๋‰ด์—์„œ "Agentic RAG"๋ฅผ ์„ ํƒํ•œ ํ›„์— "Suzano๋Š” ์–ด๋–ค ํšŒ์‚ฌ์ด์ง€?"๋ผ๊ณ  ์ž…๋ ฅํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด RAG์™€ ์›น๊ฒ€์ƒ‰์„ ํ†ตํ•ด ์–ป์–ด์ง„ ์ •๋ณด์™€ ๊ด€๋ จ ๋ฌธ์„œ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

noname

๋ณต์žก ์งˆ๋ฌธ์— ๋Œ€ํ•œ ๋™์ž‘

"RAG (OpenSearch)" ๋ฉ”๋‰ด๋กœ ์ง„์ž…ํ•ด์„œ "Suzano์™€ Delta Corp Shipping์„ ๋น„๊ตํ•ด์ฃผ์„ธ์š”."๋ผ๊ณ  ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. RAG๋Š” ์‚ฌ์šฉ์ž์˜ ์งˆ๋ฌธ์— 2๊ฐ€์ง€ ๊ฒ€์ƒ‰์ด ํ•„์š”ํ•œ ์‚ฌํ•ญ๋“ค์ด ์žˆ์Œ์—๋„ ์งˆ๋ฌธ์„ ๊ทธ๋Œ€๋กœ ๊ฒ€์ƒ‰ํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ, ์•„๋ž˜์™€ ๊ฐ™์ด ์ผ๋ถ€ ์ž๋ฃŒ๋งŒ ๊ฒ€์ƒ‰์ด ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

image

์ด์ œ, "Delta Corp Shipping์— ๋Œ€ํ•ด ์„ค๋ช…ํ•ด์ฃผ์„ธ์š”."๋ผ๊ณ  ์ž…๋ ฅํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ํ•ด๋‹น ํšŒ์‚ฌ์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ RAG๊ฐ€ ์ถฉ๋ถ„ํžˆ ๊ฐ€์ง€๊ณ  ์žˆ์Œ์„ ์•Œ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

noname

์ด์ œ "Agentic RAG" ๋ฉ”๋‰ด๋กœ ์ด๋™ํ•˜์—ฌ "Suzano์™€ Delta Corp Shipping์„ ๋น„๊ตํ•ด์ฃผ์„ธ์š”."๋ผ๊ณ  ๋‹ค์‹œ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. RAG์—์„œ๋Š” 2๊ฐ€์ง€ ๊ฒ€์ƒ‰์ด ํ•„์š”ํ•œ ์งˆ๋ฌธ์„ ์ž˜ ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•˜์˜€์ง€๋งŒ, Agentic RAG๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ๋‘ ํšŒ์‚ฌ๋ฅผ ์ž˜ ๋น„๊ตํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

noname

์ด๋•Œ์˜ LangSmith์˜ ๋กœ๊ทธ๋ฅผ ํ™•์ธํ•˜๋ฉด, ์•„๋ž˜์™€ ๊ฐ™์ด OpenSearch๋กœ "Suzano"์™€ "Delta Corp Shipping"์„ ๊ฐ๊ฐ ์กฐํšŒํ•˜์—ฌ ์–ป์€ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ง€๊ณ  ์ตœ์ข… ๋‹ต๋ณ€์„ ์–ป์€๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด์™€๊ฐ™์ด query decomposition์„ ์ด์šฉํ•˜๋ฉด, RAG ๊ฒ€์ƒ‰์˜ ๊ฒฐ๊ณผ๋ฅผ ํ–ฅ์ƒ ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

noname

๊ธฐ์—… ๋ณด๊ณ ์„œ ์ž‘์„ฑ

๋ฉ”๋‰ด์—์„œ "Ocean Agent"๋ฅผ ์„ ํƒํ•˜๋ฉด, plan-and-execute๋กœ ๋‹ต๋ณ€์„ ๊ตฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ž…๋ ฅ์ฐฝ์—์„œ "Suzano SA"๋กœ ํšŒ์‚ฌ์˜ ์ด๋ฆ„๋งŒ์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. ์•„๋ž˜ ๊ฒฐ๊ณผ์ฒ˜๋Ÿผ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ๋งํฌ๋ฅผ ๋ˆ„๋ฅด๋ฉด html๋กœ ๋ฌธ์„œ ๋‚ด์šฉ์„ ๊ณต์œ  ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์šด๋กœ๋“œ ๋งํฌ๋ฅผ ๋ˆ„๋ฅด๋ฉด markdown ํ˜•ํƒœ๋กœ ํŒŒ์ผ์„ ๋‹ค์šด๋กœ๋“œ ํ•ด์„œ github ๋˜๋Š” ๋ฉ”์ผ๋กœ ๊ณต์œ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Markdown ๋ฌธ์„œ์˜ ์˜ˆ๋Š” ํŽธ์˜์— ๋”ฐ๋ผ pdf๋“ฑ์˜ ๋ฌธ์„œ๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋ฉ”๋‰ด์—์„œ "Ocean Agent (Reflection)"์„ ์„ ํƒํ•˜๊ณ  "Suzano SA"์„ ์ž…๋ ฅํ•˜๋ฉด plan and execute์™€ reflection์ด ์ ์šฉ๋œ ๊ฒฐ๊ณผ๋ฅผ ์•„๋ž˜ ๋งํฌ์™€ ๊ฐ™์ด ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. plan and execute๋ฅผ ์ ์šฉํ–ˆ์„ ๋•Œ๋ณด๋‹ค ๋” ์ž์„ธํ•˜๊ณ  ๋งŽ์€ ์ •๋ณด๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์ง€๋งŒ, reflection์„ ์œ„ํ•ด ๋” ๋งŽ์€ token๊ณผ ์ˆ˜ํ–‰ ์‹œ๊ฐ„์„ ํ•„์š”๋กœ ํ•ฉ๋‹ˆ๋‹ค.

๊ฒฐ๊ณผ์˜ ์˜ˆ

Suzano SA

Oldendorff Carriers

Prony Resources New Caledonia

United Kaiun Kaisha

Delta Corp

An Trung Tin

Alpha Bulkers

๊ฒฐ๋ก 

OpenSearch๋ฅผ ํ™œ์šฉํ•˜์—ฌ RAG๋ฅผ ์ƒ์„ฑํ•˜๊ณ , PDF๋กœ ๋œ ๊ธฐ์—… ์ •๋ณด๋ฅผ ๋ถ„์„ํ•˜์—ฌ ๋ณด๊ณ ์„œ๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. Agentic Workflow๋ฅผ ๊ตฌ์„ฑํ•˜์—ฌ ์—ฌ๋Ÿฌ ๋‹จ๊ณ„๋กœ ์ด๋ฃจ์–ด์ง€๋Š” ๋ณต์žกํ•œ ์ž‘์—…์„ ์†์‰ฝ๊ฒŒ ๊ตฌํ˜„ํ•˜๊ณ , ๋ชฉ์ ์— ๋งž๋Š” workflow๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์„œ๋Š” ์ธํ”„๋ผ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•˜์—ฌ AWS CDK๋กœ OpenSearch๋ฅผ ์„ค์น˜ํ•˜๊ณ  ์œ ์ง€๋ณด์ˆ˜ ๋ฐ ๋ณ€ํ™”ํ•˜๋Š” ํŠธ๋ž˜ํ”ฝ ์ฒ˜๋ฆฌ์— ์œ ์šฉํ•œ ์„œ๋ฒ„๋ฆฌ์Šค ์„œ๋น„์Šค ์ค‘์‹ฌ์œผ๋กœ ์‹œ์Šคํ…œ์„ ๊ตฌ์„ฑํ•˜์˜€์Šต๋‹ˆ๋‹ค.

์ด์Šˆ ์‚ฌํ•ญ

Hallucination

์›๋ณธ ๋ฌธ์„œ์™€ ์ƒ์„ฑ๋œ ๋ฌธ์„œ๋ฅผ ๋น„๊ตํ•˜๋ฉด์„œ Hallucination ํ™•์ธ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. Anthropic Sonnet์˜ ๊ธฐ๋ณธ ์ •๋ณด๊ฐ€ ์ฐธ์กฐ๋˜๋Š” ์ผ€์ด์Šค๊ฐ€ ์žˆ์œผ๋ฏ€๋กœ, Hallucination์ด ๋ฌธ์ œ๊ฐ€ ๋ ๋•Œ๋Š” prompt๋กœ ์ข€๋” ๊ฐ•ํ•œ ์ œํ•œ์ด ํ•„์š”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฒ€์ƒ‰ ๋ฌธ์„œ์˜ ์ œํ•œ

"Panama Shipping"์„ ๊ฒ€์ƒ‰ํ•˜๋ฉด "Delta Corp Shipping"์ด ์œ ์‚ฌํ•œ ๋ฌธ์„œ๋กœ ๊ฒ€์ƒ‰์ด ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ์— ๋ฌธ์žฅ์˜ ์•ž๋ถ€๋ถ„์— ์š”์•ฝ์ด contextual retrieval๋กœ ์ œ๊ณต๋จ์œผ๋กœ์„œ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฐ€๋Šฅ์„ฑ์ด ๋‚ฎ์•„์ง€์ง€๋งŒ, ์œ ์‚ฌํ•œ ์ด๋ฆ„์„ ๊ฐ€์ง€๋Š” ๋ฌธ์„œ๋“ค์ด ๊ฐ™์ด ๊ฒ€์ƒ‰์ด ๋˜๋ฉด ๊ฒฐ๊ณผ์— ์˜ํ–ฅ์„ ๋ฏธ์น  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋งŒ์•ฝ ๋ฌธ์ œ๊ฐ€ ๋˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ๊ฒ€์ƒ‰์„ match์—์„œ term์œผ๋กœ ๋ณ€๊ฒฝํ•˜์—ฌ์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๋งŒ์•ฝ term์œผ๋กœ ์ง€์ •ํ•˜๋ฉด ๊ฒ€์ƒ‰์‹œ ํšŒ์‚ฌ๋ช…์ด full name์œผ๋กœ ์ •ํ™•ํ•˜๊ฒŒ ์ž…๋ ฅ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    boolean_filter = {
        "bool": {
            "filter":[
                {"match" : {"metadata.subject_company":subject_company}},
                {"term" : {"metadata.doc_level":"child"}}
            ]
        }
    }          
    result = vectorstore_opensearch.similarity_search_with_score(
        query = query,
        k = top_k*2,
        search_type="script_scoring",
        pre_filter = boolean_filter
    )            

๋ฆฌ์†Œ์Šค ์ •๋ฆฌํ•˜๊ธฐ

๋”์ด์ƒ ์ธํ”„๋ผ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ์— ์•„๋ž˜์ฒ˜๋Ÿผ ๋ชจ๋“  ๋ฆฌ์†Œ์Šค๋ฅผ ์‚ญ์ œํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  1. API Gateway Console๋กœ ์ ‘์†ํ•˜์—ฌ "api-chatbot-for-ocean-agent", "api-ocean-agent"์„ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

  2. Cloud9 Console์— ์ ‘์†ํ•˜์—ฌ ์•„๋ž˜์˜ ๋ช…๋ น์–ด๋กœ ์ „์ฒด ์‚ญ์ œ๋ฅผ ํ•ฉ๋‹ˆ๋‹ค.

cd ~/environment/ocean-agent/cdk-ocean-agent/ && cdk destroy --all

About

It is a case study of an intelligent agent for Ocean.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published