-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
153 lines (128 loc) · 5.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# main.py
import time
import os
import csv
import threading
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from utils import check_and_click_close_popup, countdown_timer, smooth_scroll_to, human_like_scroll
def get_url_input():
# Ask the user if they have a URL or need to enter city/keyword
print("Select an option:")
print("1. Provide a URL")
print("2. Enter City/Keyword")
choice = input("Enter the number of your choice (1 or 2): ").strip()
if choice == '1':
url = input("Enter the URL: ").strip()
elif choice == '2':
city = input("Enter the city name: ").replace(' ', '-')
keyword = input("Enter the search keyword: ").replace(' ', '-')
base_url = "https://www.justdial.com/"
url = f"{base_url}{city}/{keyword}/"
else:
print("Invalid choice. Exiting.")
exit()
return url
def get_url_from_file(filename):
# Read the URL from the specified file
try:
with open(filename, 'r') as file:
url = file.readline().strip()
if url:
return url
else:
print(f"The file '{filename}' is empty. Exiting.")
exit()
except FileNotFoundError:
print(f"The file '{filename}' does not exist. Exiting.")
exit()
# Get the URL from temp_url.txt if it exists
if os.path.exists('temp_url.txt'):
url = get_url_from_file('temp_url.txt')
else:
# Use the original URL fetching method if temp_url.txt does not exist
url = get_url_input()
# Set up Chrome options
chrome_options = Options()
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
chrome_options.add_argument(f"user-agent={user_agent}")
chrome_options.add_argument("--start-maximized")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option("useAutomationExtension", False)
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
# Set up WebDriver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
# Hide WebDriver signature
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
# Start the countdown timer in a separate thread
countdown_thread = threading.Thread(target=countdown_timer, args=(30,))
countdown_thread.start()
try:
driver.get(url)
print("Opened URL:", url)
# Check for 'Maybe Later' popup and click it if found
time.sleep(5)
try:
maybe_later_button = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, 'maybelater'))
)
if maybe_later_button.is_displayed():
maybe_later_button.click()
print("Clicked 'Maybe Later' button.")
except Exception as e:
print(f"Maybe Later popup not found or failed to click: {str(e)}")
human_like_scroll(driver)
# Fetch and save page source for debugging
page_source = driver.page_source
with open('page_source.html', 'w', encoding='utf-8') as f:
f.write(page_source)
print("Page source saved to 'page_source.html'.")
# Ensure the 'Scrapped' folder exists
os.makedirs('Scrapped', exist_ok=True)
# Find and save data in CSV
csv_filename = os.path.join('Scrapped', f"{url.split('/')[-2]}.csv")
with open(csv_filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['Name', 'Address', 'Phone']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
parent_divs = driver.find_elements(By.CLASS_NAME, 'resultbox_info')
if not parent_divs:
print("No parent divs found. Check the class name and page source.")
else:
for index, parent_div in enumerate(parent_divs):
try:
name_div = parent_div.find_element(By.CLASS_NAME, 'resultbox_title_anchor')
name = name_div.text.strip()
phone_div = parent_div.find_element(By.CLASS_NAME, 'callcontent')
phone_number = phone_div.text.strip()
address_div = parent_div.find_element(By.CLASS_NAME, 'resultbox_address') # Updated to include address
address = address_div.text.strip()
if name and phone_number and address:
writer.writerow({'Name': name, 'Address': address, 'Phone': phone_number})
else:
print(f"Missing data in parent div {index}. Name: '{name}', Address: '{address}', Phone: '{phone_number}'")
except Exception as e:
with open('error_log.txt', 'a', encoding='utf-8') as error_file:
error_file.write(f"An error occurred in parent div {index}: {str(e)}\n")
print(f"Data extraction completed and saved to '{csv_filename}'.")
except Exception as e:
print(f"An unexpected error occurred: {str(e)}")
finally:
# Print script completion message
print("Script execution completed.")
# Wait for 3 seconds
time.sleep(3)
# Remove page_source.html and stop.txt files
if os.path.exists('page_source.html'):
os.remove('page_source.html')
if os.path.exists('stop.txt'):
os.remove('stop.txt')
if os.path.exists('error_log.txt'):
os.remove('error_log.txt')
print("Deleted Logs")