Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions estela_scrapy/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

RUNNING_STATUS = "RUNNING"
COMPLETED_STATUS = "COMPLETED"
ERROR_STATUS = "ERROR"

# Performance optimization constants
TIME_CACHE_UPDATE_INTERVAL = 100 # Update time calculation every N items
Expand Down Expand Up @@ -355,37 +356,47 @@ def spider_closed(self, spider, reason):
if self.task.running:
self.task.stop()

metrics, elapsed_time = self._calculate_metrics(spider, status=reason)

stats = self.stats.get_stats()
stats.update({"elapsed_time_seconds": int(elapsed_time)})
stats.update(metrics)

try:
self.redis_conn.delete(self.stats_key)
except Exception:
pass

update_job(
self.job_url,
self.auth_token,
status=COMPLETED_STATUS,
lifespan=int(stats.get("elapsed_time_seconds", 0)),
total_bytes=stats.get("downloader/response_bytes", 0),
item_count=stats.get("item_scraped_count", 0),
request_count=stats.get("downloader/request_count", 0),
proxy_usage_data={
"proxy_name": stats.get("downloader/proxy_name", ""),
"bytes": stats.get("downloader/proxies/response_bytes", 0),
},
)

parsed_stats = json.dumps(stats, default=json_serializer)
data = {
"jid": os.getenv("ESTELA_SPIDER_JOB"),
"payload": json.loads(parsed_stats),
}
producer.send("job_stats", data)
try:
# Put all new code here
metrics, elapsed_time = self._calculate_metrics(spider, status=reason)
stats = self.stats.get_stats()
stats.update({"elapsed_time_seconds": int(elapsed_time)})
stats.update(metrics)

parsed_stats = json.dumps(stats, default=json_serializer)
data = {
"jid": os.getenv("ESTELA_SPIDER_JOB"),
"payload": json.loads(parsed_stats),
}
producer.send("job_stats", data)
job_status = COMPLETED_STATUS

except Exception as e:
print(f"Error during spider_closed: {e}")
job_status = ERROR_STATUS

finally:
try:
update_job(
self.job_url,
self.auth_token,
status=job_status,
lifespan=int(stats.get("elapsed_time_seconds", 0)),
total_bytes=stats.get("downloader/response_bytes", 0),
item_count=stats.get("item_scraped_count", 0),
request_count=stats.get("downloader/request_count", 0),
proxy_usage_data={
"proxy_name": stats.get("downloader/proxy_name", ""),
"bytes": stats.get("downloader/proxies/response_bytes", 0),
},
)
except Exception as e:
print(f"CRITICAL ERROR: Could not update job status: {e}")

def store_stats(self, spider):
metrics, elapsed_time = self._calculate_metrics(
Expand Down