-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlisting_parser.py
More file actions
452 lines (380 loc) · 15.7 KB
/
listing_parser.py
File metadata and controls
452 lines (380 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
"""
Listing URL parser module.
Extracts property data from listing URLs to reduce user questioning.
Supports: vtorynka.com.ua, OLX, DIM.RIA
"""
import re
import logging
import aiohttp
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
@dataclass
class ParsedListing:
"""Parsed property listing data"""
url: str
source: str # vtorynka, olx, domria
title: Optional[str] = None
price: Optional[str] = None
rooms: Optional[str] = None
area: Optional[str] = None
address: Optional[str] = None
city: Optional[str] = None
district: Optional[str] = None
property_type: Optional[str] = None # apartment, house, commercial
floor: Optional[str] = None
total_floors: Optional[str] = None
description: Optional[str] = None
def to_context_string(self, language: str = "uk") -> str:
"""Format listing data as context for AI"""
if language == "uk":
lines = [f"[Дані з оголошення: {self.url}]"]
if self.title:
lines.append(f"Назва: {self.title}")
if self.property_type:
lines.append(f"Тип: {self.property_type}")
if self.city:
lines.append(f"Місто: {self.city}")
if self.district:
lines.append(f"Район: {self.district}")
if self.address:
lines.append(f"Адреса: {self.address}")
if self.rooms:
lines.append(f"Кімнат: {self.rooms}")
if self.area:
lines.append(f"Площа: {self.area}")
if self.floor and self.total_floors:
lines.append(f"Поверх: {self.floor}/{self.total_floors}")
elif self.floor:
lines.append(f"Поверх: {self.floor}")
if self.price:
lines.append(f"Ціна: {self.price}")
if self.description:
# Truncate description
desc = self.description[:200] + "..." if len(self.description) > 200 else self.description
lines.append(f"Опис: {desc}")
else:
lines = [f"[Data from listing: {self.url}]"]
if self.title:
lines.append(f"Title: {self.title}")
if self.property_type:
lines.append(f"Type: {self.property_type}")
if self.city:
lines.append(f"City: {self.city}")
if self.district:
lines.append(f"District: {self.district}")
if self.address:
lines.append(f"Address: {self.address}")
if self.rooms:
lines.append(f"Rooms: {self.rooms}")
if self.area:
lines.append(f"Area: {self.area}")
if self.floor and self.total_floors:
lines.append(f"Floor: {self.floor}/{self.total_floors}")
elif self.floor:
lines.append(f"Floor: {self.floor}")
if self.price:
lines.append(f"Price: {self.price}")
if self.description:
desc = self.description[:200] + "..." if len(self.description) > 200 else self.description
lines.append(f"Description: {desc}")
return "\n".join(lines)
# URL patterns for supported platforms
URL_PATTERNS = {
"vtorynka": r"vtorynka\.com\.ua",
"olx": r"olx\.ua",
"domria": r"dom\.ria\.com",
}
def extract_urls(text: str) -> list[str]:
"""Extract all URLs from text"""
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
return re.findall(url_pattern, text)
def identify_platform(url: str) -> Optional[str]:
"""Identify which platform the URL belongs to"""
for platform, pattern in URL_PATTERNS.items():
if re.search(pattern, url, re.IGNORECASE):
return platform
return None
async def fetch_html(url: str, timeout: int = 10) -> Optional[str]:
"""Fetch HTML content from URL"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "uk-UA,uk;q=0.9,en-US;q=0.8,en;q=0.7",
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout), ssl=False) as response:
if response.status == 200:
return await response.text()
logger.warning(f"Failed to fetch {url}: status {response.status}")
except Exception as e:
logger.error(f"Error fetching {url}: {e}")
return None
def extract_meta_content(html: str, property_name: str) -> Optional[str]:
"""Extract content from meta tag by property or name"""
patterns = [
rf'<meta\s+property=["\']?{property_name}["\']?\s+content=["\']([^"\']+)["\']',
rf'<meta\s+content=["\']([^"\']+)["\']\s+property=["\']?{property_name}["\']?',
rf'<meta\s+name=["\']?{property_name}["\']?\s+content=["\']([^"\']+)["\']',
]
for pattern in patterns:
match = re.search(pattern, html, re.IGNORECASE)
if match:
return match.group(1).strip()
return None
def clean_text(text: Optional[str]) -> Optional[str]:
"""Clean HTML entities and whitespace"""
if not text:
return None
text = re.sub(r' ', ' ', text)
text = re.sub(r'&', '&', text)
text = re.sub(r'<', '<', text)
text = re.sub(r'>', '>', text)
text = re.sub(r'"', '"', text)
text = re.sub(r'&#(\d+);', lambda m: chr(int(m.group(1))), text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
async def parse_vtorynka(url: str, html: str) -> ParsedListing:
"""Parse vtorynka.com.ua listing"""
listing = ParsedListing(url=url, source="vtorynka")
# Title from og:title or h1
listing.title = extract_meta_content(html, "og:title")
if not listing.title:
h1_match = re.search(r'<h1[^>]*>([^<]+)</h1>', html)
if h1_match:
listing.title = clean_text(h1_match.group(1))
# Price - look for common patterns
price_patterns = [
r'(\d[\d\s]*(?:\$|USD|грн|usd))',
r'(\$\s*\d[\d\s]*)',
r'price["\']?\s*:\s*["\']?(\d[\d\s,\.]+)',
r'class=["\'][^"\']*price[^"\']*["\'][^>]*>([^<]+)',
]
for pattern in price_patterns:
match = re.search(pattern, html, re.IGNORECASE)
if match:
listing.price = clean_text(match.group(1))
break
# Rooms
rooms_patterns = [
r'(\d+)\s*(?:кімнат|комнат|room|rooms)',
r'(?:кімнат|комнат|rooms?)\s*[:\-]?\s*(\d+)',
r'(\d+)[- ]?к(?:імн|омн)',
]
for pattern in rooms_patterns:
match = re.search(pattern, html, re.IGNORECASE)
if match:
listing.rooms = match.group(1)
break
# Area
area_patterns = [
r'(\d+(?:[.,]\d+)?)\s*(?:м²|м2|m²|кв\.?\s*м)',
r'(?:площа|площадь|area)\s*[:\-]?\s*(\d+(?:[.,]\d+)?)',
]
for pattern in area_patterns:
match = re.search(pattern, html, re.IGNORECASE)
if match:
listing.area = match.group(1) + " м²"
break
# Floor
floor_match = re.search(r'(\d+)\s*/\s*(\d+)\s*(?:поверх|этаж|floor)', html, re.IGNORECASE)
if floor_match:
listing.floor = floor_match.group(1)
listing.total_floors = floor_match.group(2)
else:
floor_match = re.search(r'(?:поверх|этаж|floor)\s*[:\-]?\s*(\d+)', html, re.IGNORECASE)
if floor_match:
listing.floor = floor_match.group(1)
# Address/Location from URL or content
address_match = re.search(r'(?:адреса|адрес|address)\s*[:\-]?\s*([^<\n]+)', html, re.IGNORECASE)
if address_match:
listing.address = clean_text(address_match.group(1))
# City detection
cities = ["Київ", "Kyiv", "Одеса", "Odesa", "Львів", "Lviv", "Харків", "Kharkiv", "Дніпро", "Dnipro"]
for city in cities:
if city.lower() in html.lower():
listing.city = city
break
# Property type detection
if re.search(r'квартир|apartment|flat', html, re.IGNORECASE):
listing.property_type = "apartment"
elif re.search(r'будинок|house|дом', html, re.IGNORECASE):
listing.property_type = "house"
elif re.search(r'комерц|commercial|офіс|office', html, re.IGNORECASE):
listing.property_type = "commercial"
return listing
async def parse_olx(url: str, html: str) -> ParsedListing:
"""Parse olx.ua listing"""
listing = ParsedListing(url=url, source="olx")
# Title
listing.title = extract_meta_content(html, "og:title")
if not listing.title:
title_match = re.search(r'<h1[^>]*data-cy=["\']ad_title["\'][^>]*>([^<]+)', html)
if title_match:
listing.title = clean_text(title_match.group(1))
# Price from JSON-LD or patterns
price_match = re.search(r'"price"\s*:\s*"?(\d[\d\s,\.]*)"?', html)
if price_match:
listing.price = clean_text(price_match.group(1))
else:
price_match = re.search(r'class=["\'][^"\']*price[^"\']*["\'][^>]*>([^<]+)', html, re.IGNORECASE)
if price_match:
listing.price = clean_text(price_match.group(1))
# Extract parameters from OLX-specific patterns
# Rooms
rooms_match = re.search(r'(?:Кількість кімнат|Комнат|Rooms?)[:\s]*(\d+)', html, re.IGNORECASE)
if rooms_match:
listing.rooms = rooms_match.group(1)
# Area
area_match = re.search(r'(?:Загальна площа|Площадь|Area)[:\s]*(\d+(?:[.,]\d+)?)', html, re.IGNORECASE)
if area_match:
listing.area = area_match.group(1) + " м²"
# Floor
floor_match = re.search(r'(?:Поверх|Этаж|Floor)[:\s]*(\d+)(?:/(\d+))?', html, re.IGNORECASE)
if floor_match:
listing.floor = floor_match.group(1)
if floor_match.group(2):
listing.total_floors = floor_match.group(2)
# Location from breadcrumbs or meta
location = extract_meta_content(html, "og:locality")
if location:
listing.city = clean_text(location)
# Address from content
address_match = re.search(r'"addressLocality"\s*:\s*"([^"]+)"', html)
if address_match:
listing.city = clean_text(address_match.group(1))
address_street = re.search(r'"streetAddress"\s*:\s*"([^"]+)"', html)
if address_street:
listing.address = clean_text(address_street.group(1))
# Property type from URL or content
if "/kvartiry/" in url or "/apartments/" in url or "квартир" in html.lower():
listing.property_type = "apartment"
elif "/doma/" in url or "/houses/" in url or "будинок" in html.lower():
listing.property_type = "house"
elif "/kommercheskaya-nedvizhimost/" in url or "комерц" in html.lower():
listing.property_type = "commercial"
# Description
desc_match = re.search(r'<div[^>]*data-cy=["\']ad_description["\'][^>]*>(.{50,500})', html, re.DOTALL)
if desc_match:
desc = re.sub(r'<[^>]+>', ' ', desc_match.group(1))
listing.description = clean_text(desc)
return listing
async def parse_domria(url: str, html: str) -> ParsedListing:
"""Parse dom.ria.com listing"""
listing = ParsedListing(url=url, source="domria")
# Title
listing.title = extract_meta_content(html, "og:title")
# Price
price_match = re.search(r'"price"\s*:\s*"?(\d[\d\s,\.]*)"?', html)
if not price_match:
price_match = re.search(r'class=["\'][^"\']*price[^"\']*["\'][^>]*>\s*(\$?\s*\d[\d\s,\.]+)', html, re.IGNORECASE)
if price_match:
listing.price = clean_text(price_match.group(1))
# JSON-LD parsing for DOM.RIA
json_ld_match = re.search(r'<script type="application/ld\+json">([^<]+)</script>', html)
if json_ld_match:
import json
try:
data = json.loads(json_ld_match.group(1))
if "name" in data:
listing.title = data["name"]
if "address" in data:
addr = data["address"]
if isinstance(addr, dict):
listing.address = addr.get("streetAddress")
listing.city = addr.get("addressLocality")
else:
listing.address = str(addr)
except json.JSONDecodeError:
pass
# Parameters from page
# Rooms
rooms_match = re.search(r'(?:Кімнат|Комнат)[:\s]*(\d+)', html, re.IGNORECASE)
if rooms_match:
listing.rooms = rooms_match.group(1)
# Area
area_match = re.search(r'(?:Загальна площа|Общая площадь)[:\s]*(\d+(?:[.,]\d+)?)', html, re.IGNORECASE)
if area_match:
listing.area = area_match.group(1) + " м²"
# Floor
floor_match = re.search(r'(\d+)\s*поверх(?:у|[^а-яі])?\s*(?:з|из)\s*(\d+)', html, re.IGNORECASE)
if floor_match:
listing.floor = floor_match.group(1)
listing.total_floors = floor_match.group(2)
else:
floor_match = re.search(r'(?:Поверх|Этаж)[:\s]*(\d+)', html, re.IGNORECASE)
if floor_match:
listing.floor = floor_match.group(1)
# Property type
if "/kvartiry/" in url or "/apartments/" in url:
listing.property_type = "apartment"
elif "/doma/" in url or "/houses/" in url:
listing.property_type = "house"
elif "/commercial/" in url or "/kommercheskaya/" in url:
listing.property_type = "commercial"
# Description
desc_match = re.search(r'class=["\'][^"\']*description[^"\']*["\'][^>]*>(.{50,500})', html, re.DOTALL | re.IGNORECASE)
if desc_match:
desc = re.sub(r'<[^>]+>', ' ', desc_match.group(1))
listing.description = clean_text(desc)
return listing
async def parse_listing_url(url: str) -> Optional[ParsedListing]:
"""
Parse a property listing URL and extract data.
Args:
url: The listing URL to parse
Returns:
ParsedListing object with extracted data, or None if parsing failed
"""
platform = identify_platform(url)
if not platform:
logger.debug(f"Unknown platform for URL: {url}")
return None
html = await fetch_html(url)
if not html:
logger.warning(f"Could not fetch HTML for {url}")
return None
try:
if platform == "vtorynka":
return await parse_vtorynka(url, html)
elif platform == "olx":
return await parse_olx(url, html)
elif platform == "domria":
return await parse_domria(url, html)
except Exception as e:
logger.error(f"Error parsing {platform} listing {url}: {e}")
return None
async def parse_urls_from_message(message: str) -> list[ParsedListing]:
"""
Extract and parse all property listing URLs from a message.
Args:
message: User message text
Returns:
List of successfully parsed listings
"""
urls = extract_urls(message)
if not urls:
return []
listings = []
for url in urls:
listing = await parse_listing_url(url)
if listing:
listings.append(listing)
return listings
def format_listings_context(listings: list[ParsedListing], language: str = "uk") -> str:
"""
Format multiple listings as context for the AI.
Args:
listings: List of parsed listings
language: Language code
Returns:
Formatted context string
"""
if not listings:
return ""
parts = []
for listing in listings:
parts.append(listing.to_context_string(language))
return "\n\n".join(parts)