forked from TheBlewish/Web-LLM-Assistant-Llamacpp-Ollama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Self_Improving_Search.py
368 lines (309 loc) · 16.4 KB
/
Self_Improving_Search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import time
import re
import os
from typing import List, Dict, Tuple, Union
from llama_cpp import Llama
from duckduckgo_search import DDGS
from colorama import Fore, Style
import logging
import sys
from io import StringIO
from web_scraper import get_web_content, can_fetch
from llm_config import get_llm_config
from llm_response_parser import UltimateLLMResponseParser
from urllib.parse import urlparse
# Set up logging
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Configure logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_file = os.path.join(log_directory, 'llama_output.log')
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(file_handler)
logger.propagate = False
# Suppress other loggers
for name in ['root', 'duckduckgo_search', 'requests', 'urllib3']:
logging.getLogger(name).setLevel(logging.WARNING)
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = False
class OutputRedirector:
def __init__(self, stream=None):
self.stream = stream or StringIO()
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
def __enter__(self):
sys.stdout = self.stream
sys.stderr = self.stream
return self.stream
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
class EnhancedSelfImprovingSearch:
def __init__(self, llm: Llama, parser: UltimateLLMResponseParser, max_attempts: int = 5):
self.llm = llm
self.parser = parser
self.max_attempts = max_attempts
self.llm_config = get_llm_config()
@staticmethod
def initialize_llm():
llm_config = get_llm_config()
llm_config['verbose'] = False
with OutputRedirector() as output:
llm = Llama(**llm_config)
initialization_output = output.getvalue()
logger.info(f"LLM Initialization Output:\n{initialization_output}")
return llm
def print_thinking(self):
print(Fore.MAGENTA + "🧠 Thinking..." + Style.RESET_ALL)
def print_searching(self):
print(Fore.MAGENTA + "📝 Searching..." + Style.RESET_ALL)
def search_and_improve(self, user_query: str) -> str:
attempt = 0
while attempt < self.max_attempts:
print(f"\n{Fore.CYAN}Search attempt {attempt + 1}:{Style.RESET_ALL}")
self.print_searching()
try:
formulated_query, time_range = self.formulate_query(user_query, attempt)
print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}")
if not formulated_query:
print(f"{Fore.RED}Error: Empty search query. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
search_results = self.perform_search(formulated_query, time_range)
if not search_results:
print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}")
attempt += 1
continue
self.display_search_results(search_results)
selected_urls = self.select_relevant_pages(search_results, user_query)
if not selected_urls:
print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
print(Fore.MAGENTA + "⚙️ Scraping selected pages..." + Style.RESET_ALL)
scraped_content = self.scrape_content(selected_urls)
if not scraped_content:
print(f"{Fore.RED}Failed to scrape content. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
self.display_scraped_content(scraped_content)
self.print_thinking()
return self.generate_final_answer(user_query, scraped_content)
except Exception as e:
print(f"{Fore.RED}An error occurred during search attempt. Check the log file for details.{Style.RESET_ALL}")
logger.error(f"An error occurred during search: {str(e)}", exc_info=True)
attempt += 1
return self.generate_final_answer(user_query, {}) # Empty scraped_content if all attempts fail
def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Based on the following user question, formulate a concise and effective search query:
"{user_query_short}"
Your task:
1. Create a search query of 2-5 words that will yield relevant results.
2. Determine if a specific time range is needed for the search.
Time range options:
- 'd': Limit results to the past day. Use for very recent events or rapidly changing information.
- 'w': Limit results to the past week. Use for recent events or topics with frequent updates.
- 'm': Limit results to the past month. Use for relatively recent information or ongoing events.
- 'y': Limit results to the past year. Use for annual events or information that changes yearly.
- 'none': No time limit. Use for historical information or topics not tied to a specific time frame.
Respond in the following format:
Search query: [Your 2-5 word query]
Time range: [d/w/m/y/none]
Do not provide any additional information or explanation.
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response = self.llm(prompt, max_tokens=50, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in formulate_query:\n{llm_output}")
response_text = response['choices'][0]['text'].strip()
query, time_range = self.parse_query_response(response_text)
if query and time_range:
return query, time_range
return self.fallback_query(user_query), "none"
def parse_query_response(self, response: str) -> Tuple[str, str]:
query = ""
time_range = "none"
for line in response.split('\n'):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
if "query" in key:
query = self.clean_query(value)
elif "time" in key or "range" in key:
time_range = self.validate_time_range(value)
return query, time_range
def clean_query(self, query: str) -> str:
query = re.sub(r'["\'\[\]]', '', query)
query = re.sub(r'\s+', ' ', query)
return query.strip()[:100]
def validate_time_range(self, time_range: str) -> str:
valid_ranges = ['d', 'w', 'm', 'y', 'none']
time_range = time_range.lower()
return time_range if time_range in valid_ranges else 'none'
def fallback_query(self, user_query: str) -> str:
words = user_query.split()
return " ".join(words[:5])
def perform_search(self, query: str, time_range: str) -> List[Dict]:
if not query:
return []
with DDGS() as ddgs:
try:
with OutputRedirector() as output:
if time_range and time_range != 'none':
results = list(ddgs.text(query, timelimit=time_range, max_results=10))
else:
results = list(ddgs.text(query, max_results=10))
ddg_output = output.getvalue()
logger.info(f"DDG Output in perform_search:\n{ddg_output}")
print(f"{Fore.GREEN}Search query sent to DuckDuckGo: {query}{Style.RESET_ALL}")
print(f"{Fore.GREEN}Time range sent to DuckDuckGo: {time_range}{Style.RESET_ALL}")
print(f"{Fore.GREEN}Number of results: {len(results)}{Style.RESET_ALL}")
return [{'number': i+1, **result} for i, result in enumerate(results)]
except Exception as e:
print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}")
return []
def display_search_results(self, results: List[Dict]):
print(f"\n{Fore.CYAN}Search Results:{Style.RESET_ALL}")
for result in results:
print(f"{Fore.GREEN}Result {result['number']}:{Style.RESET_ALL}")
print(f"Title: {result.get('title', 'N/A')}")
print(f"Snippet: {result.get('body', 'N/A')[:200]}...")
print(f"URL: {result.get('href', 'N/A')}\n")
def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]:
prompt = f"""
Given the following search results for the user's question: "{user_query}"
Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection.
Search Results:
{self.format_results(search_results)}
Instructions:
1. You MUST select exactly 2 result numbers from the search results.
2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question.
3. Provide a brief reason for each selection.
You MUST respond using EXACTLY this format and nothing else:
Selected Results: [Two numbers corresponding to the selected results]
Reasoning: [Your reasoning for the selections]
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response = self.llm(prompt, max_tokens=200, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}")
response_text = response['choices'][0]['text'].strip()
parsed_response = self.parse_page_selection_response(response_text)
if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)):
selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']]
allowed_urls = [url for url in selected_urls if can_fetch(url)]
if allowed_urls:
return allowed_urls
else:
print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}")
allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2]
return allowed_urls
def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]:
lines = response.split('\n')
parsed = {}
for line in lines:
if line.startswith('Selected Results:'):
parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)]
elif line.startswith('Reasoning:'):
parsed['reasoning'] = line.split(':', 1)[1].strip()
return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None
def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool:
if len(parsed_response['selected_results']) != 2:
return False
if any(num < 1 or num > num_results for num in parsed_response['selected_results']):
return False
return True
def format_results(self, results: List[Dict]) -> str:
formatted_results = []
for result in results:
formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n"
formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n"
formatted_result += f" URL: {result.get('href', 'N/A')}\n"
formatted_results.append(formatted_result)
return "\n".join(formatted_results)
def scrape_content(self, urls: List[str]) -> Dict[str, str]:
scraped_content = {}
for url in urls:
if can_fetch(url):
content = get_web_content([url])
if content:
scraped_content.update(content)
print(Fore.YELLOW + f"Successfully scraped: {url}" + Style.RESET_ALL)
else:
print(f"{Fore.YELLOW}Warning: Failed to scrape content from {url}{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Warning: Robots.txt disallows scraping of {url}{Style.RESET_ALL}")
print(Fore.YELLOW + f"Scraped content received for {len(scraped_content)} URLs" + Style.RESET_ALL)
return scraped_content
def display_scraped_content(self, scraped_content: Dict[str, str]):
print(f"\n{Fore.CYAN}Scraped Content:{Style.RESET_ALL}")
for url, content in scraped_content.items():
print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}")
print(f"Content: {content[:4000]}...\n")
def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str:
user_query_short = user_query[:200]
prompt = f"""
You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly.
Question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Important Instructions:
1. Do not use phrases like "Based on the absence of selected results" or similar.
2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing.
3. Provide as much relevant detail as possible from the scraped content.
Answer:
"""
max_retries = 3
for attempt in range(max_retries):
with OutputRedirector() as output:
response = self.llm(prompt, max_tokens=1024, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in generate_final_answer:\n{llm_output}")
response_text = response['choices'][0]['text'].strip()
if response_text:
logger.info(f"LLM Response:\n{response_text}")
return response_text
error_message = "I apologize, but I couldn't generate a satisfactory answer based on the available information."
logger.warning(f"Failed to generate a response after {max_retries} attempts. Returning error message.")
return error_message
def format_scraped_content(self, scraped_content: Dict[str, str]) -> str:
formatted_content = []
for url, content in scraped_content.items():
content = re.sub(r'\s+', ' ', content)
formatted_content.append(f"Content from {url}:\n{content}\n")
return "\n".join(formatted_content)
def synthesize_final_answer(self, user_query: str) -> str:
prompt = f"""
After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}"
Please provide the best possible answer you can, acknowledging any limitations or uncertainties.
If appropriate, suggest ways the user might refine their question or where they might find more information.
Respond in a clear, concise, and informative manner.
"""
try:
with OutputRedirector() as output:
response = self.llm(prompt, max_tokens=self.llm_config['max_tokens'], stop=self.llm_config['stop'])
llm_output = output.getvalue()
logger.info(f"LLM Output in synthesize_final_answer:\n{llm_output}")
final_answer = response['choices'][0]['text'].strip()
return final_answer
except Exception as e:
logger.error(f"Error in synthesize_final_answer: {str(e)}", exc_info=True)
return "I apologize, but after multiple attempts, I wasn't able to find a satisfactory answer to your question. Please try rephrasing your question or breaking it down into smaller, more specific queries."
# End of EnhancedSelfImprovingSearch class