11import json
22import logging
3+ from time import time
34from typing import List
4- from elasticsearch import ApiError , Elasticsearch
55
66from langchain .schema import BaseChatMessageHistory
77from langchain .schema .messages import BaseMessage , _message_to_dict , messages_from_dict
@@ -20,20 +20,33 @@ class ElasticsearchChatMessageHistory(BaseChatMessageHistory):
2020
2121 def __init__ (
2222 self ,
23- client : Elasticsearch ,
23+ client ,
2424 index : str ,
2525 session_id : str ,
2626 ):
27+ try :
28+ from elasticsearch import Elasticsearch
29+ except ImportError :
30+ raise ImportError (
31+ "Could not import elasticsearch python package. "
32+ "Please install it with `pip install elasticsearch`."
33+ )
34+
2735 self .client : Elasticsearch = client
2836 self .index : str = index
2937 self .session_id : str = session_id
3038
31- if not client .indices .exists (index = index ):
39+ if client .indices .exists (index = index ):
40+ logger .debug (f"Chat history index { index } already exists, skipping creation." )
41+ else :
42+ logger .debug (f"Creating index { index } for storing chat history." )
43+
3244 client .indices .create (
3345 index = index ,
3446 mappings = {
3547 "properties" : {
3648 "session_id" : {"type" : "keyword" },
49+ "created_at" : {"type" : "date" },
3750 "history" : {"type" : "text" }
3851 }
3952 }
@@ -43,9 +56,12 @@ def __init__(
4356 def messages (self ) -> List [BaseMessage ]:
4457 """Retrieve the messages from Elasticsearch"""
4558 try :
59+ from elasticsearch import ApiError
60+
4661 result = self .client .search (
4762 index = self .index ,
48- query = {"term" : {"session_id" : self .session_id }}
63+ query = {"term" : {"session_id" : self .session_id }},
64+ sort = "created_at:asc"
4965 )
5066 except ApiError as err :
5167 logger .error (err )
@@ -60,22 +76,28 @@ def messages(self) -> List[BaseMessage]:
6076 def add_message (self , message : BaseMessage ) -> None :
6177 """Add a message to the chat session in Elasticsearch"""
6278 try :
79+ from elasticsearch import ApiError
80+
6381 self .client .index (
6482 index = self .index ,
6583 body = {
6684 "session_id" : self .session_id ,
85+ "created_at" : round (time () * 1000 ),
6786 "history" : json .dumps (_message_to_dict (message ))
68- }
87+ },
88+ refresh = True
6989 )
7090 except ApiError as err :
7191 logger .error (err )
7292
7393 def clear (self ) -> None :
7494 """Clear session memory in Elasticsearch"""
7595 try :
96+ from elasticsearch import ApiError
97+
7698 self .client .delete_by_query (
7799 index = self .index ,
78100 query = {"term" : {"session_id" : self .session_id }}
79101 )
80- except ApiError as err :
81- logger .error (err )
102+ except ApiError :
103+ logger .error ('Could not clear session memory in Elasticsearch' )
0 commit comments