Skip to content

Add API payload GCP logging #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ async def create_source_knowledge_graph_url(

try:
start = time.time()
payload_json_obj = {'api_name':'url_scan', 'db_url':uri, 'userName':userName, 'database':database, 'source_url':source_url, 'aws_access_key_id':aws_access_key_id,
'model':model, 'gcs_bucket_name':gcs_bucket_name, 'gcs_bucket_folder':gcs_bucket_folder, 'source_type':source_type,
'gcs_project_id':gcs_project_id, 'wiki_query':wiki_query, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
if source_url is not None:
source = source_url
else:
Expand Down Expand Up @@ -174,6 +178,11 @@ async def extract_knowledge_graph_from_file(
"""
try:
start_time = time.time()
payload_json_obj = {'api_name':'extract', 'db_url':uri, 'userName':userName, 'database':database, 'source_url':source_url, 'aws_access_key_id':aws_access_key_id,
'model':model, 'gcs_bucket_name':gcs_bucket_name, 'gcs_bucket_folder':gcs_bucket_folder, 'source_type':source_type,'gcs_blob_filename':gcs_blob_filename,
'file_name':file_name, 'gcs_project_id':gcs_project_id, 'wiki_query':wiki_query,'allowedNodes':allowedNodes,'allowedRelationship':allowedRelationship,
'language':language ,'retry_condition':retry_condition,'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)

Expand Down Expand Up @@ -238,6 +247,8 @@ async def get_source_list(uri:str, userName:str, password:str, database:str=None
"""
try:
start = time.time()
payload_json_obj = {'api_name':'sources_list', 'db_url':uri, 'userName':userName, 'database':database, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
decoded_password = decode_password(password)
if " " in uri:
uri = uri.replace(" ","+")
Expand All @@ -257,6 +268,8 @@ async def get_source_list(uri:str, userName:str, password:str, database:str=None
@app.post("/post_processing")
async def post_processing(uri=Form(), userName=Form(), password=Form(), database=Form(), tasks=Form(None)):
try:
payload_json_obj = {'api_name':'post_processing', 'db_url':uri, 'userName':userName, 'database':database, 'tasks':tasks, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
tasks = set(map(str.strip, json.loads(tasks)))

Expand All @@ -279,8 +292,9 @@ async def post_processing(uri=Form(), userName=Form(), password=Form(), database
model = "openai_gpt_4o"
await asyncio.to_thread(create_communities, uri, userName, password, database,model)
josn_obj = {'api_name': 'post_processing/create_communities', 'db_url': uri, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(josn_obj)
logging.info(f'created communities')

logger.log_struct(josn_obj)
return create_api_response('Success', message='All tasks completed successfully')

except Exception as e:
Expand All @@ -298,6 +312,9 @@ async def chat_bot(uri=Form(),model=Form(None),userName=Form(), password=Form(),
logging.info(f"QA_RAG called at {datetime.now()}")
qa_rag_start_time = time.time()
try:
payload_json_obj = {'api_name':'chat_bot', 'db_url':uri, 'userName':userName, 'database':database, 'question':question,'document_names':document_names,
'session_id':session_id, 'mode':mode, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
if mode == "graph":
graph = Neo4jGraph( url=uri,username=userName,password=password,database=database,sanitize = True, refresh_schema=True)
else:
Expand All @@ -311,7 +328,7 @@ async def chat_bot(uri=Form(),model=Form(None),userName=Form(), password=Form(),
logging.info(f"Total Response time is {total_call_time:.2f} seconds")
result["info"]["response_time"] = round(total_call_time, 2)

json_obj = {'api_name':'chat_bot','db_url':uri,'session_id':session_id, 'logging_time': formatted_time(datetime.now(timezone.utc)), 'elapsed_api_time':f'{total_call_time:.2f}'}
json_obj = {'api_name':'chat_bot','db_url':uri,'session_id':session_id,'mode':mode, 'logging_time': formatted_time(datetime.now(timezone.utc)), 'elapsed_api_time':f'{total_call_time:.2f}'}
logger.log_struct(json_obj, "INFO")

return create_api_response('Success',data=result)
Expand All @@ -328,6 +345,9 @@ async def chat_bot(uri=Form(),model=Form(None),userName=Form(), password=Form(),
async def chunk_entities(uri=Form(),userName=Form(), password=Form(), database=Form(), nodedetails=Form(None),entities=Form(),mode=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'chunk_entities', 'db_url':uri, 'userName':userName, 'database':database, 'nodedetails':nodedetails,'entities':entities,
'mode':mode, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
result = await asyncio.to_thread(get_entities_from_chunkids,uri=uri, username=userName, password=password, database=database,nodedetails=nodedetails,entities=entities,mode=mode)
end = time.time()
elapsed_time = end - start
Expand All @@ -352,7 +372,9 @@ async def graph_query(
document_names: str = Form(None),
):
try:
# print(document_names)
payload_json_obj = {'api_name':'graph_query', 'db_url':uri, 'userName':userName, 'database':database, 'document_names':document_names,
'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
start = time.time()
result = await asyncio.to_thread(
get_graph_results,
Expand Down Expand Up @@ -380,6 +402,8 @@ async def graph_query(
@app.post("/clear_chat_bot")
async def clear_chat_bot(uri=Form(),userName=Form(), password=Form(), database=Form(), session_id=Form(None)):
try:
payload_json_obj = {'api_name':'clear_chat_bot', 'db_url':uri, 'userName':userName, 'database':database, 'session_id':session_id, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
result = await asyncio.to_thread(clear_chat_history,graph=graph,session_id=session_id)
return create_api_response('Success',data=result)
Expand All @@ -396,6 +420,8 @@ async def clear_chat_bot(uri=Form(),userName=Form(), password=Form(), database=F
async def connect(uri=Form(), userName=Form(), password=Form(), database=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'connect', 'db_url':uri, 'userName':userName, 'database':database, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
result = await asyncio.to_thread(connection_check_and_get_vector_dimensions, graph, database)
end = time.time()
Expand All @@ -417,6 +443,9 @@ async def upload_large_file_into_chunks(file:UploadFile = File(...), chunkNumber
password=Form(), database=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'upload', 'db_url':uri, 'userName':userName, 'database':database, 'chunkNumber':chunkNumber,'totalChunks':totalChunks,
'original_file_name':originalname,'model':model, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
result = await asyncio.to_thread(upload_file, graph, model, file, chunkNumber, totalChunks, originalname, uri, CHUNK_DIR, MERGED_DIR)
end = time.time()
Expand All @@ -442,6 +471,8 @@ async def upload_large_file_into_chunks(file:UploadFile = File(...), chunkNumber
async def get_structured_schema(uri=Form(), userName=Form(), password=Form(), database=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'schema', 'db_url':uri, 'userName':userName, 'database':database, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
result = await asyncio.to_thread(get_labels_and_relationtypes, graph)
end = time.time()
Expand Down Expand Up @@ -512,6 +543,9 @@ async def delete_document_and_entities(uri=Form(),
deleteEntities=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'delete_document_and_entities', 'db_url':uri, 'userName':userName, 'database':database, 'filenames':filenames,'deleteEntities':deleteEntities,
'source_types':source_types, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
result, files_list_size = await asyncio.to_thread(graphDb_data_Access.delete_file_from_graph, filenames, source_types, deleteEntities, MERGED_DIR, uri)
Expand Down Expand Up @@ -568,6 +602,9 @@ async def get_document_status(file_name, url, userName, password, database):
@app.post("/cancelled_job")
async def cancelled_job(uri=Form(), userName=Form(), password=Form(), database=Form(), filenames=Form(None), source_types=Form(None)):
try:
payload_json_obj = {'api_name':'cancelled_job', 'db_url':uri, 'userName':userName, 'database':database,
'filenames':filenames,'source_types':source_types,'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
result = manually_cancelled_job(graph,filenames, source_types, MERGED_DIR, uri)

Expand All @@ -584,6 +621,8 @@ async def cancelled_job(uri=Form(), userName=Form(), password=Form(), database=F
@app.post("/populate_graph_schema")
async def populate_graph_schema(input_text=Form(None), model=Form(None), is_schema_description_checked=Form(None)):
try:
payload_json_obj = {'api_name':'populate_graph_schema', 'model':model, 'is_schema_description_checked':is_schema_description_checked, 'input_text':input_text, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
result = populate_graph_schema_from_text(input_text, model, is_schema_description_checked)
return create_api_response('Success',data=result)
except Exception as e:
Expand All @@ -598,6 +637,8 @@ async def populate_graph_schema(input_text=Form(None), model=Form(None), is_sche
@app.post("/get_unconnected_nodes_list")
async def get_unconnected_nodes_list(uri=Form(), userName=Form(), password=Form(), database=Form()):
try:
payload_json_obj = {'api_name':'get_unconnected_nodes_list', 'db_url':uri, 'userName':userName, 'database':database, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
start = time.time()
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
Expand All @@ -619,6 +660,9 @@ async def get_unconnected_nodes_list(uri=Form(), userName=Form(), password=Form(
@app.post("/delete_unconnected_nodes")
async def delete_orphan_nodes(uri=Form(), userName=Form(), password=Form(), database=Form(),unconnected_entities_list=Form()):
try:
payload_json_obj = {'api_name':'delete_unconnected_nodes', 'db_url':uri, 'userName':userName, 'database':database,
'unconnected_entities_list':unconnected_entities_list, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
start = time.time()
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
Expand All @@ -641,6 +685,8 @@ async def delete_orphan_nodes(uri=Form(), userName=Form(), password=Form(), data
async def get_duplicate_nodes(uri=Form(), userName=Form(), password=Form(), database=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'get_duplicate_nodes', 'db_url':uri, 'userName':userName, 'database':database, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
nodes_list, total_nodes = graphDb_data_Access.get_duplicate_nodes_list()
Expand All @@ -662,6 +708,9 @@ async def get_duplicate_nodes(uri=Form(), userName=Form(), password=Form(), data
async def merge_duplicate_nodes(uri=Form(), userName=Form(), password=Form(), database=Form(),duplicate_nodes_list=Form()):
try:
start = time.time()
payload_json_obj = {'api_name':'merge_duplicate_nodes', 'db_url':uri, 'userName':userName, 'database':database,
'duplicate_nodes_list':duplicate_nodes_list, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
result = graphDb_data_Access.merge_duplicate_nodes(duplicate_nodes_list)
Expand All @@ -682,6 +731,9 @@ async def merge_duplicate_nodes(uri=Form(), userName=Form(), password=Form(), da
@app.post("/drop_create_vector_index")
async def merge_duplicate_nodes(uri=Form(), userName=Form(), password=Form(), database=Form(), isVectorIndexExist=Form()):
try:
payload_json_obj = {'api_name':'drop_create_vector_index', 'db_url':uri, 'userName':userName, 'database':database,
'isVectorIndexExist':isVectorIndexExist, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
graphDb_data_Access = graphDBdataAccess(graph)
result = graphDb_data_Access.drop_create_vector_index(isVectorIndexExist)
Expand All @@ -698,6 +750,9 @@ async def merge_duplicate_nodes(uri=Form(), userName=Form(), password=Form(), da
@app.post("/retry_processing")
async def retry_processing(uri=Form(), userName=Form(), password=Form(), database=Form(), file_name=Form(), retry_condition=Form()):
try:
payload_json_obj = {'api_name':'retry_processing', 'db_url':uri, 'userName':userName, 'database':database, 'file_name':file_name,'retry_condition':retry_condition,
'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
graph = create_graph_database_connection(uri, userName, password, database)
await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition)
#set_status_retry(graph,file_name,retry_condition)
Expand All @@ -714,14 +769,16 @@ async def retry_processing(uri=Form(), userName=Form(), password=Form(), databas
@app.post('/metric')
async def calculate_metric(question=Form(), context=Form(), answer=Form(), model=Form()):
try:
result = await asyncio.to_thread(get_ragas_metrics, question, context, answer, model)
if result is None or "error" in result:
return create_api_response(
'Failed',
message='Failed to calculate evaluation metrics.',
error=result.get("error", "Ragas evaluation returned null")
)
return create_api_response('Success', data=result)
payload_json_obj = {'api_name':'metric', 'context':context, 'answer':answer, 'model':model, 'logging_time': formatted_time(datetime.now(timezone.utc))}
logger.log_struct(payload_json_obj, "INFO")
result = await asyncio.to_thread(get_ragas_metrics, question, context, answer, model)
if result is None or "error" in result:
return create_api_response(
'Failed',
message='Failed to calculate evaluation metrics.',
error=result.get("error", "Ragas evaluation returned null")
)
return create_api_response('Success', data=result)
except Exception as e:
job_status = "Failed"
message = "Error while calculating evaluation metrics"
Expand Down