-
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathupgrading_queue.py
More file actions
executable file
·1081 lines (937 loc) · 56.8 KB
/
upgrading_queue.py
File metadata and controls
executable file
·1081 lines (937 loc) · 56.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
from typing import Dict, Any, Optional
from datetime import datetime, timedelta, timezone
from database.database_writing import add_to_collected_notifications
from queues.scraping_queue import ScrapingQueue
from queues.adding_queue import AddingQueue
from utilities.settings import get_setting
from utilities.plex_functions import remove_file_from_plex
from database.not_wanted_magnets import is_magnet_not_wanted, is_url_not_wanted
import os
import pickle
from pathlib import Path
from database.database_writing import update_media_item
from database.core import get_db_connection
from difflib import SequenceMatcher
from debrid.common import extract_hash_from_magnet, extract_hash_from_file
from database.torrent_tracking import record_torrent_addition, update_torrent_tracking, get_torrent_history
from PTT import parse_title
import re
from scraper.functions.ptt_parser import parse_with_ptt
class UpgradingQueue:
def __init__(self):
self.items = []
self.upgrade_times = {}
self.last_scrape_times = {}
self.upgrades_found = {}
self.scraping_queue = ScrapingQueue()
db_content_dir = os.environ.get('USER_DB_CONTENT', '/user/db_content')
self.upgrades_file = Path(db_content_dir) / "upgrades.pkl"
self.failed_upgrades_file = Path(db_content_dir) / "failed_upgrades.pkl"
self.upgrade_states_file = Path(db_content_dir) / "upgrade_states.pkl"
self.upgrades_data = self.load_upgrades_data()
self.failed_upgrades = self.load_failed_upgrades()
self.upgrade_states = self.load_upgrade_states()
self.currently_processing_item_id: Optional[str] = None
# Track last run date for the daily delayed-upgrade pass
self._last_delayed_upgrade_run_date = None
def load_upgrades_data(self):
try:
if self.upgrades_file.exists():
if self.upgrades_file.stat().st_size == 0:
logging.info(f"Upgrades file is empty, initializing new data")
return {}
with open(self.upgrades_file, 'rb') as f:
try:
return pickle.load(f)
except (EOFError, pickle.UnpicklingError) as e:
logging.error(f"Error loading upgrades data, file may be corrupted: {str(e)}")
# Backup the corrupted file
backup_path = str(self.upgrades_file) + '.bak'
try:
import shutil
shutil.copy2(self.upgrades_file, backup_path)
logging.info(f"Backed up corrupted upgrades file to {backup_path}")
except Exception as backup_err:
logging.error(f"Failed to backup corrupted file: {str(backup_err)}")
return {}
return {}
except Exception as e:
logging.error(f"Unexpected error loading upgrades data: {str(e)}")
return {}
def save_upgrades_data(self):
with open(self.upgrades_file, 'wb') as f:
pickle.dump(self.upgrades_data, f)
def load_failed_upgrades(self):
try:
if self.failed_upgrades_file.exists():
if self.failed_upgrades_file.stat().st_size == 0:
logging.info(f"Failed upgrades file is empty, initializing new data")
return {}
with open(self.failed_upgrades_file, 'rb') as f:
try:
return pickle.load(f)
except (EOFError, pickle.UnpicklingError) as e:
logging.error(f"Error loading failed upgrades data, file may be corrupted: {str(e)}")
# Backup the corrupted file
backup_path = str(self.failed_upgrades_file) + '.bak'
try:
import shutil
shutil.copy2(self.failed_upgrades_file, backup_path)
logging.info(f"Backed up corrupted failed upgrades file to {backup_path}")
except Exception as backup_err:
logging.error(f"Failed to backup corrupted file: {str(backup_err)}")
return {}
return {}
except Exception as e:
logging.error(f"Unexpected error loading failed upgrades data: {str(e)}")
return {}
def save_failed_upgrades(self):
with open(self.failed_upgrades_file, 'wb') as f:
pickle.dump(self.failed_upgrades, f)
def load_upgrade_states(self):
try:
if self.upgrade_states_file.exists():
if self.upgrade_states_file.stat().st_size == 0:
logging.info(f"Upgrade states file is empty, initializing new data")
return {}
with open(self.upgrade_states_file, 'rb') as f:
try:
return pickle.load(f)
except (EOFError, pickle.UnpicklingError) as e:
logging.error(f"Error loading upgrade states data, file may be corrupted: {str(e)}")
# Backup the corrupted file
backup_path = str(self.upgrade_states_file) + '.bak'
try:
import shutil
shutil.copy2(self.upgrade_states_file, backup_path)
logging.info(f"Backed up corrupted upgrade states file to {backup_path}")
except Exception as backup_err:
logging.error(f"Failed to backup corrupted file: {str(backup_err)}")
return {}
return {}
except Exception as e:
logging.error(f"Unexpected error loading upgrade states data: {str(e)}")
return {}
def save_upgrade_states(self):
try:
with open(self.upgrade_states_file, 'wb') as f:
pickle.dump(self.upgrade_states, f)
# Optional: Add a debug log here if needed, but maybe too verbose
# logging.debug(f"Successfully saved upgrade states to {self.upgrade_states_file}")
except (IOError, pickle.PicklingError, EOFError) as e:
logging.error(f"Failed to save upgrade states to {self.upgrade_states_file}: {str(e)}", exc_info=True)
# Depending on severity, you might want to raise the exception
# or implement a more robust backup/retry mechanism here.
def save_item_state(self, item: Dict[str, Any]):
"""Save complete item state before attempting an upgrade"""
item_id = item['id']
if item_id not in self.upgrade_states:
self.upgrade_states[item_id] = []
# Save complete item state with timestamp
current_state_copy = item.copy() # Ensure we are saving a distinct copy
self.upgrade_states[item_id].append({
'timestamp': datetime.now(),
'state': current_state_copy
})
# --- Call the save function ---
self.save_upgrade_states()
# --- Log SUCCESS *after* saving ---
logging.info(f"Saved state snapshot for item {item_id} (Total states stored for item: {len(self.upgrade_states[item_id])})")
def get_last_stable_state(self, item_id: str) -> Optional[Dict[str, Any]]:
"""Get the most recent stable state for an item"""
if item_id not in self.upgrade_states or not self.upgrade_states[item_id]:
return None
return self.upgrade_states[item_id][-1]['state']
def restore_item_state(self, item: Dict[str, Any]) -> bool:
"""Restore item to its last stable state"""
item_id = item['id']
last_state_entry = None # Initialize
# --- Enhanced Check ---
if item_id not in self.upgrade_states or not self.upgrade_states[item_id]:
logging.warning(f"No previous state found for item {item_id} in upgrade_states dictionary.")
return False
# --- End Enhanced Check ---
# Get the state entry (dictionary containing timestamp and state)
last_state_entry = self.upgrade_states[item_id][-1]
last_state = last_state_entry.get('state') # Extract the actual state dictionary
if not last_state:
logging.error(f"Found state entry for item {item_id}, but the 'state' key is missing or empty. Cannot restore.")
# Optional: Maybe remove the corrupted entry?
# self.upgrade_states[item_id].pop()
# self.save_upgrade_states()
return False
try:
conn = get_db_connection()
conn.execute('BEGIN TRANSACTION')
# Update all fields from the saved state
placeholders = ', '.join(f'{k} = ?' for k in last_state.keys())
values = list(last_state.values())
query = f'''
UPDATE media_items
SET {placeholders}
WHERE id = ?
'''
values.append(item_id)
conn.execute(query, values)
conn.commit()
# Remove the used state from history *after* successful DB commit
if self.upgrade_states.get(item_id): # Check if key still exists
try:
self.upgrade_states[item_id].pop()
logging.info(f"Popped used state for item {item_id}. Remaining states: {len(self.upgrade_states[item_id])}")
self.save_upgrade_states() # Save after popping
except IndexError:
logging.warning(f"Attempted to pop state for item {item_id}, but the list was already empty (concurrent modification?).")
logging.info(f"Successfully restored previous state for item {item_id} from snapshot taken at {last_state_entry.get('timestamp')}")
return True
except Exception as e:
conn.rollback()
logging.error(f"Failed to restore previous state for item {item_id} using snapshot from {last_state_entry.get('timestamp') if last_state_entry else 'N/A'}: {str(e)}")
return False
finally:
if conn: # Ensure conn exists before closing
conn.close()
def add_failed_upgrade(self, item_id: str, result_info: Dict[str, Any]):
if item_id not in self.failed_upgrades:
self.failed_upgrades[item_id] = []
# Add the failed upgrade info with timestamp
self.failed_upgrades[item_id].append({
'title': result_info.get('title'),
'magnet': result_info.get('magnet'),
'timestamp': datetime.now(),
'reason': 'no_progress'
})
self.save_failed_upgrades()
def revert_failed_upgrade(self, item: Dict[str, Any]):
"""Revert an item back to its previous state when an upgrade fails"""
logging.info(f"Reverting failed upgrade for item {self.generate_identifier(item)}")
# Get the previous file information
upgrading_from = item.get('upgrading_from')
upgrading_from_torrent_id = item.get('upgrading_from_torrent_id')
if upgrading_from:
# Update the database to revert the upgrade
conn = get_db_connection()
try:
conn.execute('BEGIN TRANSACTION')
conn.execute('''
UPDATE media_items
SET filled_by_file = ?,
filled_by_torrent_id = ?,
upgrading_from = NULL,
upgrading_from_torrent_id = NULL,
state = 'Upgrading',
last_updated = ?
WHERE id = ?
''', (
upgrading_from,
upgrading_from_torrent_id,
datetime.now(),
item['id']
))
conn.commit()
logging.info(f"Successfully reverted upgrade for item {self.generate_identifier(item)}")
except Exception as e:
conn.rollback()
logging.error(f"Failed to revert upgrade: {str(e)}")
finally:
conn.close()
else:
logging.warning(f"No previous version found for item {self.generate_identifier(item)}")
def update(self):
from database import get_all_media_items
self.items = [dict(row) for row in get_all_media_items(state="Upgrading")]
for item in self.items:
if item['id'] not in self.upgrade_times:
collected_at = item.get('original_collected_at', datetime.now())
self.upgrade_times[item['id']] = {
'start_time': datetime.now(),
'time_added': collected_at.strftime('%Y-%m-%d %H:%M:%S') if isinstance(collected_at, datetime) else str(collected_at)
}
def get_contents(self):
contents = []
for item in self.items:
item_copy = item.copy()
upgrade_info = self.upgrade_times.get(item['id'])
if upgrade_info:
item_copy['time_added'] = upgrade_info['time_added']
else:
item_copy['time_added'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Add upgrade history information
item_copy['upgrades_found'] = self.upgrades_data.get(item['id'], {}).get('count', 0)
item_copy['upgrade_history'] = self.upgrades_data.get(item['id'], {}).get('history', [])
contents.append(item_copy)
return contents
def add_item(self, item: Dict[str, Any]):
self.items.append(item)
collected_at = item.get('original_collected_at', datetime.now())
logging.info(f"collected_at: {collected_at}")
self.upgrade_times[item['id']] = {
'start_time': datetime.now(),
'time_added': collected_at.strftime('%Y-%m-%d %H:%M:%S') if isinstance(collected_at, datetime) else str(collected_at)
}
self.last_scrape_times[item['id']] = datetime.now()
self.upgrades_found[item['id']] = 0 # Initialize upgrades found count
# Ensure the upgrades_data entry is initialized
if item['id'] not in self.upgrades_data:
self.upgrades_data[item['id']] = {'count': 0, 'history': []}
self.save_upgrades_data()
def remove_item(self, item: Dict[str, Any]):
self.items = [i for i in self.items if i['id'] != item['id']]
if item['id'] in self.upgrade_times:
del self.upgrade_times[item['id']]
if item['id'] in self.last_scrape_times:
del self.last_scrape_times[item['id']]
if item['id'] in self.upgrades_found:
del self.upgrades_found[item['id']]
if item['id'] in self.upgrades_data:
del self.upgrades_data[item['id']]
self.save_upgrades_data()
def clean_up_upgrade_times(self):
for item_id in list(self.upgrade_times.keys()):
if item_id not in [item['id'] for item in self.items]:
del self.upgrade_times[item_id]
if item_id in self.last_scrape_times:
del self.last_scrape_times[item_id]
logging.debug(f"Cleaned up upgrade time for item ID: {item_id}")
for item_id in list(self.upgrades_found.keys()):
if item_id not in [item['id'] for item in self.items]:
del self.upgrades_found[item_id]
for item_id in list(self.upgrades_data.keys()):
if item_id not in [item['id'] for item in self.items]:
del self.upgrades_data[item_id]
self.save_upgrades_data()
def process(self, queue_manager=None):
current_time = datetime.now()
# Run delayed upgrade scrape once per day based on setting Scraping.delayed_upgrade_scrape_days
try:
days_setting = get_setting('Scraping', 'delayed_upgrade_scrape_days', '0')
delayed_days = int(days_setting) if str(days_setting).strip() else 0
except Exception:
delayed_days = 0
if delayed_days > 0:
today = current_time.date()
if self._last_delayed_upgrade_run_date != today:
try:
self._run_daily_delayed_upgrade_scrape(delayed_days)
finally:
self._last_delayed_upgrade_run_date = today
for item in self.items[:]: # Create a copy of the list to iterate over
try:
item_id = item['id']
upgrade_info = self.upgrade_times.get(item_id)
if upgrade_info:
collected_at = datetime.fromisoformat(item['original_collected_at']) if isinstance(item['original_collected_at'], str) else item['original_collected_at']
# Skip if original_collected_at is None
if collected_at is None:
logging.warning(f"Item {item_id} has None for original_collected_at, skipping upgrade queue processing")
continue
time_in_queue = current_time - collected_at
logging.info(f"Item {item_id} has been in the Upgrading queue for {time_in_queue}.")
# Get the configured duration from settings, default to 24 hours if blank or invalid
try:
setting_value = get_setting('Debug', 'upgrade_queue_duration_hours', '24')
queue_duration_hours = int(setting_value) if setting_value.strip() else 24
except (ValueError, AttributeError):
queue_duration_hours = 24
max_duration = timedelta(hours=queue_duration_hours)
# Perform the hourly scrape if due
if self.should_perform_hourly_scrape(item_id, current_time):
logging.info(f"Performing hourly scrape for item {item_id} which has been in queue for {time_in_queue}.")
self.hourly_scrape(item, queue_manager) # This might remove the item if upgraded
self.last_scrape_times[item_id] = current_time
# Nested Check: After scrape, check if item still exists AND has timed out
if any(i['id'] == item_id for i in self.items):
if time_in_queue > max_duration:
logging.info(f"Item {item_id} timed out after scrape attempt (in queue > {queue_duration_hours} hours).")
# Remove the item due to timeout
self.remove_item(item)
from database import update_media_item_state
update_media_item_state(item_id, state="Collected")
logging.info(f"Moved item {item_id} to Collected state due to timeout.")
# else:
# Optional: Log if item survived scrape and hasn't timed out
# logging.debug(f"Item {item_id} survived scrape and has not timed out.")
else:
# Item was removed during the scrape (upgraded)
logging.info(f"Item {item_id} was removed during hourly scrape (likely upgraded). Skipping timeout check.")
else:
# This case is unlikely given the hourly task execution, but handles it
logging.debug(f"Skipping scrape for item {item_id} - not time yet.")
except Exception as e:
logging.error(f"Error processing item {item.get('id', 'unknown')}: {str(e)}")
logging.exception("Traceback:")
# Clean up upgrade times for items no longer in the queue
self.clean_up_upgrade_times()
def _run_daily_delayed_upgrade_scrape(self, delayed_days: int):
"""Perform a one-time daily upgrade scrape for items released exactly delayed_days ago.
Eligibility is controlled by media_items.delayed_upgrade_eligible flag. Each item is
scraped at most once by this routine; after scraping we disable the flag.
"""
try:
from database.database_writing import (
get_delayed_upgrade_eligible_items,
update_delayed_upgrade_eligibility,
)
except Exception as e:
logging.error(f"Unable to import delayed-upgrade DB helpers: {e}")
return
try:
candidates = get_delayed_upgrade_eligible_items(delayed_days) or []
except Exception as e:
logging.error(f"Failed to load delayed-upgrade candidates: {e}")
return
if not candidates:
logging.info("No delayed-upgrade candidates found today")
return
logging.info(f"Delayed-upgrade daily pass: {len(candidates)} candidate(s) at {delayed_days} days since release")
for item in candidates:
try:
item_id = item.get('id')
if not item_id:
continue
# Mark as consumed before attempting (ensures single run)
update_delayed_upgrade_eligibility(item_id, False)
# Perform single scrape attempt
self.hourly_scrape(item, queue_manager=None)
except Exception as e:
logging.error(f"Delayed-upgrade scrape failed for item {item.get('id')}: {e}")
def should_perform_hourly_scrape(self, item_id: str, current_time: datetime) -> bool:
#return True
last_scrape_time = self.last_scrape_times.get(item_id)
if last_scrape_time is None:
logging.info(f"Item {item_id} has never been scraped before, running first scrape")
return True
time_since_last_scrape = current_time - last_scrape_time
should_run = time_since_last_scrape >= timedelta(hours=1)
if should_run:
logging.info(f"Running scrape for item {item_id} - Last scrape was {time_since_last_scrape} ago")
else:
logging.info(f"Skipping scrape for item {item_id} - Only {time_since_last_scrape} since last scrape, waiting for 1 hour")
return should_run
def log_upgrade(self, item: Dict[str, Any], adding_queue: AddingQueue):
# Get db_content directory from environment variable with fallback
db_content_dir = os.environ.get('USER_DB_CONTENT', '/user/db_content')
log_file = os.path.join(db_content_dir, "upgrades.log")
item_identifier = self.generate_identifier(item)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_file = adding_queue.get_new_item_values(item)
log_entry = f"{timestamp} - Upgraded: {item_identifier} - New File: {new_file['filled_by_file']} - Original File: {item['upgrading_from']}\n"
# Create the log file if it doesn't exist
os.makedirs(os.path.dirname(log_file), exist_ok=True)
if not os.path.exists(log_file):
open(log_file, 'w').close()
# Append the log entry to the file
with open(log_file, 'a') as f:
f.write(log_entry)
# Update upgrades_data
if item['id'] not in self.upgrades_data:
self.upgrades_data[item['id']] = {'count': 0, 'history': []}
self.upgrades_data[item['id']]['count'] += 1
self.upgrades_data[item['id']]['history'].append({
'datetime': datetime.now(),
'new_file': item['filled_by_file'],
'original_file': item['upgrading_from']
})
self.save_upgrades_data()
def log_failed_upgrade(self, item: Dict[str, Any], target_title: str, reason: str):
"""Log a failed upgrade attempt to the upgrades log"""
db_content_dir = os.environ.get('USER_DB_CONTENT', '/user/db_content')
log_file = os.path.join(db_content_dir, "upgrades.log")
item_identifier = self.generate_identifier(item)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{timestamp} - Failed Upgrade: {item_identifier} - Target: {target_title} - Reason: {reason}\n"
# Create the log file if it doesn't exist
os.makedirs(os.path.dirname(log_file), exist_ok=True)
if not os.path.exists(log_file):
open(log_file, 'w').close()
# Append the log entry to the file
with open(log_file, 'a') as f:
f.write(log_entry)
def get_currently_processing_item_id(self) -> Optional[str]:
"""Returns the ID of the item currently being processed, or None."""
return self.currently_processing_item_id
def calculate_current_item_score(self, item: Dict[str, Any], version_settings: Dict[str, Any]) -> float:
"""Calculate a score for the current item when current_score is 0"""
from scraper.functions.rank_results import rank_result_key
from scraper.functions.file_processing import parse_torrent_info
from utilities.settings import get_setting
# Get the current item's title for scoring
current_title = item.get('original_scraped_torrent_title') or item.get('filled_by_file', '')
if not current_title:
logging.warning(f"Cannot calculate score for item {item['id']} - no title available")
return 0.0
# Parse the current item's title using PTT
try:
parsed_info = parse_torrent_info(current_title)
if not parsed_info or parsed_info.get('parsing_error'):
logging.warning(f"Failed to parse current item title '{current_title}' for scoring")
return 0.0
except Exception as e:
logging.error(f"Error parsing current item title '{current_title}': {e}")
return 0.0
# Create a synthetic result object for the current item
current_result = {
'title': current_title,
'size': item.get('size', 0.0), # Use item's size if available
'source': 'Current Item',
'magnet': '', # Not needed for scoring
'seeders': 0, # Not needed for scoring
'parsed_info': parsed_info,
'additional_metadata': {
'filename': item.get('filled_by_file', '')
}
}
# Get version settings for scoring
if not version_settings:
from utilities.settings import get_setting
version_name = item.get('version', 'default')
try:
from queues.config_manager import load_config
config = load_config()
version_settings = config.get('Scraping', {}).get('versions', {}).get(version_name, {})
except Exception as e:
logging.warning(f"Could not load version settings for '{version_name}': {e}")
version_settings = {}
# Get additional parameters for scoring
query_title = item.get('title', '')
query_year = item.get('year')
query_season = item.get('season_number')
query_episode = item.get('episode_number')
content_type = item.get('type', 'movie')
multi = False # Current item is not a multi-pack
# Get preferred language and translated title if available
preferred_language = get_setting('Scraping', 'preferred_language', None)
translated_title = None # Could be enhanced to get from item if available
# Get show season episode counts for TV shows
show_season_episode_counts = None
if content_type == 'episode' and query_season:
try:
from metadata.metadata import get_show_season_episode_counts
show_season_episode_counts = get_show_season_episode_counts(item.get('tmdb_id'))
except Exception as e:
logging.debug(f"Could not get season episode counts for scoring: {e}")
# Calculate score using rank_result_key
try:
score_key = rank_result_key(
current_result,
[current_result], # Single item list
query_title,
query_year,
query_season,
query_episode,
multi,
content_type,
version_settings,
preferred_language=preferred_language,
translated_title=translated_title,
show_season_episode_counts=show_season_episode_counts
)
# Extract the total score from the result
total_score = current_result.get('score_breakdown', {}).get('total_score', 0.0)
logging.info(f"Calculated score for current item '{current_title}': {total_score:.2f}")
return total_score
except Exception as e:
logging.error(f"Error calculating score for current item '{current_title}': {e}")
return 0.0
def hourly_scrape(self, item: Dict[str, Any], queue_manager=None):
item_identifier = self.generate_identifier(item)
logging.info(f"Starting hourly scrape for {item_identifier}")
self.currently_processing_item_id = item['id']
try:
# --- Max Upgrading Score Check ---
max_upgrading_score = 0.0
try:
max_upgrading_score = float(get_setting('Debug', 'max_upgrading_score', 0.0))
except Exception as e:
logging.warning(f"Could not parse max_upgrading_score setting: {e}. Defaulting to 0.0 (disabled)")
# Get current score for max upgrading score check
current_score_for_check = item.get('current_score', 0)
if max_upgrading_score > 0 and current_score_for_check >= max_upgrading_score:
logging.info(f"Skipping upgrade for {item_identifier}: current_score {current_score_for_check} >= max_upgrading_score {max_upgrading_score}")
return
# --- End Max Upgrading Score Check ---
update_media_item(item['id'], upgrading=True)
# Determine if the current item is a multi-pack using PTT parser
is_multi_pack = False # Default to false
current_title_original = item.get('original_scraped_torrent_title')
current_title_fallback_file = item.get('filled_by_file')
current_title_for_similarity = None # Use this only for similarity check, not score
if current_title_original:
current_title_for_similarity = current_title_original
logging.info(f"Using original_scraped_torrent_title for similarity check: {current_title_for_similarity}")
elif current_title_fallback_file:
current_title_for_similarity = current_title_fallback_file
logging.warning(f"No original_scraped_torrent_title found, using filled_by_file for similarity check: {current_title_for_similarity}")
else:
logging.error(f"No current title found for item {item_identifier}, cannot perform similarity check accurately.")
# Proceed without similarity title if needed, or handle error
# Get unfiltered results first
logging.info(f"[{item_identifier}] Calling scrape_with_fallback with is_multi_pack={is_multi_pack} to get results")
# Use skip_filter=False here - we want the scraper's default filtering initially
results, filtered_out = self.scraping_queue.scrape_with_fallback(item, is_multi_pack, queue_manager or self, skip_filter=False)
if not results:
logging.info(f"No results returned from scrape_with_fallback for {item_identifier}")
# Potentially reset upgrading flag if no results consistently? Or just wait.
# update_media_item(item['id'], upgrading=False) # Optional: Reset if no results?
return
# --- Calculate Current Item Score if needed ---
current_score = item.get('current_score', 0)
if current_score <= 0:
# Get version settings for scoring
version_name = item.get('version', 'default')
try:
from queues.config_manager import load_config
config = load_config()
version_settings = config.get('Scraping', {}).get('versions', {}).get(version_name, {})
except Exception as e:
logging.warning(f"Could not load version settings for '{version_name}': {e}")
version_settings = {}
calculated_score = self.calculate_current_item_score(item, version_settings)
if calculated_score > 0:
current_score = calculated_score
logging.info(f"Updated current score for {item_identifier} from 0 to {current_score:.2f}")
# Update the item's current_score in the database
update_media_item(item['id'], current_score=current_score)
# Update local item dict
item['current_score'] = current_score
# --- Start Filtering ---
# Get similarity threshold from settings, default to 95%
similarity_threshold = 0.95 # Default, consider making configurable if not already indirectly
try:
# Note: This threshold seems high (0.95), maybe meant to be lower?
# Re-using upgrading_percentage_threshold name, but it's for title similarity here.
# Let's clarify the setting name or use a different one if needed.
# Assuming 'upgrading_percentage_threshold' IS for score diff, and 0.95 is hardcoded/intended for title similarity.
# If 0.95 is meant for score diff, the logic below needs adjustment.
# If 'upgrading_percentage_threshold' is for title similarity, rename setting variable.
# --> Let's assume similarity_threshold = 0.95 is for TITLE similarity <--
# Get SCORE percentage threshold
threshold_value = get_setting('Scraping', 'upgrading_percentage_threshold', '0.1')
if isinstance(threshold_value, str):
upgrading_score_percentage_threshold = float(threshold_value.strip()) if threshold_value.strip() else 0.1
elif isinstance(threshold_value, (int, float)):
upgrading_score_percentage_threshold = float(threshold_value)
else:
# If it's neither string, int, nor float, use default and log a warning for unexpected type
logging.warning(f"Unexpected type for upgrading_percentage_threshold: {type(threshold_value)}. Using default 0.1.")
upgrading_score_percentage_threshold = 0.1
except ValueError: # Catches errors from float() conversion if string is invalid
logging.warning("Invalid upgrading_percentage_threshold setting (ValueError after type check), using default value of 0.1 for score increase.")
upgrading_score_percentage_threshold = 0.1
except AttributeError: # This should ideally not be hit with the new checks, but kept for safety
logging.warning("Invalid upgrading_percentage_threshold setting (AttributeError), using default value of 0.1 for score increase.")
upgrading_score_percentage_threshold = 0.1
# Apply filtering: not wanted, failed upgrades
filtered_results = []
failed_upgrades = self.failed_upgrades.get(item['id'], [])
failed_magnets = {fu['magnet'] for fu in failed_upgrades}
for result in results:
# 1. Check Not Wanted (unless disabled)
if not item.get('disable_not_wanted_check'):
if is_magnet_not_wanted(result['magnet']):
logging.info(f"Result '{result.get('title', 'N/A')}' filtered out by not_wanted_magnets check")
continue
if is_url_not_wanted(result['magnet']): # Assuming magnet field might contain URL for some reason? Or separate field? Adapt if needed.
logging.info(f"Result '{result.get('title', 'N/A')}' filtered out by not_wanted_urls check")
continue
# 2. Check Failed Upgrades
if result.get('magnet') in failed_magnets:
logging.info(f"Result '{result.get('title', 'N/A')}' filtered out as a previously failed upgrade attempt.")
continue
# 3. Check Title Similarity (if we have a title to compare against)
# This prevents replacing with something that has the same name but might be slightly different release/encoding if scores are close
if current_title_for_similarity:
similarity = SequenceMatcher(None, current_title_for_similarity.lower(), result.get('title', '').lower()).ratio()
if similarity >= similarity_threshold:
logging.info(f"Result '{result.get('title', 'N/A')}' filtered out due to high title similarity ({similarity:.2%}) to current item.")
continue
# If passed all filters, add to list
filtered_results.append(result)
if not filtered_results:
logging.info(f"All results were filtered out for {item_identifier}")
# update_media_item(item['id'], upgrading=False) # Optional: Reset if no results pass filters?
return
# --- Find Best Upgrade Candidate ---
logging.info(f"[{item_identifier}] Comparing {len(filtered_results)} filtered results against current score {current_score:.2f}")
better_results = []
for result in filtered_results:
result_score = result.get('score_breakdown', {}).get('total_score', 0)
# Check if the result score is actually better than the stored score
is_better_score = False
if result_score > current_score:
if current_score <= 0:
# Any positive score is better than non-positive
is_better_score = True
logging.debug(f" -> Result '{result.get('title', 'N/A')}' ({result_score:.2f}) is better than non-positive current score ({current_score:.2f}).")
else:
# Check percentage increase threshold for positive scores
score_increase_percent = (result_score - current_score) / current_score
if score_increase_percent > upgrading_score_percentage_threshold:
is_better_score = True
logging.debug(f" -> Result '{result.get('title', 'N/A')}' ({result_score:.2f}) meets score threshold ({score_increase_percent:+.2%} > {upgrading_score_percentage_threshold:.2%}) compared to current ({current_score:.2f}).")
else:
logging.debug(f" -> Result '{result.get('title', 'N/A')}' ({result_score:.2f}) score increase ({score_increase_percent:+.2%}) does NOT meet threshold ({upgrading_score_percentage_threshold:.2%}) compared to current ({current_score:.2f}).")
if is_better_score:
better_results.append(result)
else:
# Log why it wasn't considered better if score wasn't higher
if result_score <= current_score:
logging.debug(f" -> Result '{result.get('title', 'N/A')}' ({result_score:.2f}) score is not higher than current ({current_score:.2f}).")
# Sort better_results by score descending to pick the best
better_results.sort(key=lambda r: r.get('score_breakdown', {}).get('total_score', 0), reverse=True)
if better_results:
best_result = better_results[0]
best_score = best_result.get('score_breakdown', {}).get('total_score', 0)
logging.info(f"Found {len(better_results)} potential upgrade(s) for {item_identifier}.")
logging.info(f"Best candidate: '{best_result.get('title', 'N/A')}' with score {best_score:.2f} (Current score: {current_score:.2f})")
# --- Proceed with Upgrade Attempt ---
self.save_item_state(item) # Save state before attempting
logging.info(f"[{item_identifier}] Updating item state to Adding with best result title: {best_result.get('title', 'N/A')}")
from database import update_media_item_state, get_media_item_by_id
# Prepare update data - include the new score!
update_data = {
'state': 'Adding',
'filled_by_title': best_result.get('title'),
'scrape_results': better_results, # Store candidates
'upgrading_from': item['filled_by_file'],
# Store the score that triggered the upgrade attempt
# Note: This score might not be persisted if the adding fails,
# but it's useful for the AddingQueue logic.
# The final score update happens in update_item_with_upgrade upon success.
# Let's add 'potential_upgrade_score' to scrape_results or similar if needed by AddingQueue
}
# We might want to pass the best_result score explicitly if AddingQueue needs it immediately
# For now, assume AddingQueue recalculates or uses scrape_results
update_media_item_state(item['id'], **update_data)
updated_item = get_media_item_by_id(item['id']) # Reload item with updated state
# Use AddingQueue to attempt the upgrade with updated item
adding_queue = AddingQueue()
logging.info(f"[{item_identifier}] Adding item to adding queue for upgrade attempt")
adding_queue.add_item(updated_item) # Pass the reloaded item
lock_acquired = False
try:
if queue_manager and hasattr(queue_manager, 'upgrade_process_locks'):
queue_manager.upgrade_process_locks.add(updated_item['id'])
lock_acquired = True
logging.debug(f"[{item_identifier}] Added lock for upgrade process: {updated_item['id']}")
else:
logging.warning(f"[{item_identifier}] Could not acquire upgrade lock - QueueManager or lock set missing.")
logging.info(f"[{item_identifier}] Processing adding queue for upgrade attempt")
adding_queue.process(queue_manager, ignore_upgrade_lock=True) # Synchronous call
finally:
if lock_acquired and queue_manager and hasattr(queue_manager, 'upgrade_process_locks'):
queue_manager.upgrade_process_locks.discard(updated_item['id'])
logging.debug(f"[{item_identifier}] Removed lock for upgrade process: {updated_item['id']}")
# Check final state after AddingQueue processing
from database.core import get_db_connection
conn = get_db_connection()
cursor = conn.execute('SELECT state FROM media_items WHERE id = ?', (item['id'],))
current_state_after_add = cursor.fetchone()['state']
conn.close()
if current_state_after_add == 'Checking':
logging.info(f"Successfully initiated upgrade for item {item_identifier}. Item moved to Checking.")
# Update item data with the successful upgrade details, including the NEW score
self.update_item_with_upgrade(item, adding_queue, best_result) # Pass best_result to get score
# Log success, record tracking etc. (combine logic from original code)
self.log_upgrade(item, adding_queue) # Needs updated item dict after update_item_with_upgrade?
# Record tracking based on best_result
hash_value = extract_hash_from_magnet(best_result.get('magnet')) if best_result.get('magnet') else None
if hash_value:
# Simplified - use existing item data merged with best_result details
tracking_item_data = {**item, 'version': best_result.get('version'), 'state': 'Checking'}
history = get_torrent_history(hash_value)
trigger_details = {
'source': 'upgrading_queue',
'queue_initiated': True,
'upgrade_check': True,
'current_version': item.get('version'),
'target_version': best_result.get('version'),
'score_improvement': best_score - current_score # Calculate diff
}
rationale = f"Upgrading from version {item.get('version')} (score {current_score:.2f}) to {best_result.get('version')} (score {best_score:.2f})"
if history:
update_torrent_tracking(
torrent_hash=hash_value, item_data=tracking_item_data,
trigger_details=trigger_details, trigger_source='queue_upgrade', rationale=rationale
)
logging.info(f"[{item_identifier}] Updated torrent tracking for hash {hash_value}")
else:
try:
record_torrent_addition(
torrent_hash=hash_value, trigger_source="queue_upgrade",
trigger_details={**trigger_details, 'selected_files': best_result.get('files')},
rationale=rationale, item_data=tracking_item_data # Use combined data
)
logging.info(f"Recorded upgrade torrent addition for {item['title']}.")
except Exception as e:
logging.error(f"Error recording upgrade torrent addition for {item['title']}: {e}", exc_info=True)
# Update internal tracking data
if item['id'] not in self.upgrades_data:
self.upgrades_data[item['id']] = {'count': 0, 'history': []}
self.upgrades_data[item['id']]['count'] += 1
# History logging is inside log_upgrade, which itself calls save_upgrades_data
# Remove item from this queue as it's now handled by CheckingQueue
logging.info(f"[{item_identifier}] Removing item from upgrading queue after successful upgrade initiation.")
self.remove_item(item)
else:
logging.warning(f"Failed to upgrade item {item_identifier} - state after AddingQueue process: {current_state_after_add}")
from routes.notifications import send_upgrade_failed_notification
notification_data = {
'title': item.get('title', 'Unknown Title'),
'year': item.get('year', ''),
'reason': f'Failed in AddingQueue (State: {current_state_after_add})'
}
send_upgrade_failed_notification(notification_data)
self.log_failed_upgrade(item, best_result.get('title', 'N/A'), f'Failed in AddingQueue (State: {current_state_after_add})')
# Restore complete previous state
if self.restore_item_state(item):
# Track the failed upgrade attempt
self.add_failed_upgrade(item['id'], best_result) # Log the one we tried
logging.info(f"Restored previous state and added to failed upgrades list for {item_identifier}")
# Item remains in Upgrading queue, but state reset in DB
# We might need to update the 'upgrading' flag back to False if restore_item_state doesn't
update_media_item(item['id'], upgrading=False)
else:
logging.error(f"Failed to restore previous state for {item_identifier}, manual intervention may be needed")
# Item might be stuck, consider moving to a failed state?
except Exception as e:
logging.error(f"Error during hourly scrape for {item_identifier}: {e}", exc_info=True)
# Consider if any specific cleanup or state change is needed on error
finally:
# Ensure the processing ID is cleared regardless of success, failure, or removal
logging.debug(f"Finished hourly scrape processing for {item_identifier}. Clearing processing flag.")
self.currently_processing_item_id = None
def update_item_with_upgrade(self, item: Dict[str, Any], adding_queue: AddingQueue, best_result: Dict[str, Any]):
"""Updates the database item after a successful upgrade initiation (moved to Checking)."""
new_values = adding_queue.get_new_item_values(item) # Get details from AddingQueue's perspective (e.g., selected files)
new_score = best_result.get('score_breakdown', {}).get('total_score', 0) # Get score from the chosen result
if new_values:
conn = get_db_connection()
try:
conn.execute('BEGIN TRANSACTION')
upgrading_from = item['filled_by_file']
upgrading_from_version = item.get('version')
clean_version = new_values.get('version', '').strip('*') if new_values.get('version') else best_result.get('version', '').strip('*')
# Update the item in the database including the new score
conn.execute('''
UPDATE media_items
SET upgrading_from = ?,
filled_by_file = ?,
filled_by_magnet = ?,
version = ?,
current_score = ?, -- Update the score
last_updated = ?,
state = ?,
upgrading_from_torrent_id = ?,
upgraded = 1,
upgrading_from_version = ?,
upgrading = 0 -- Reset upgrading flag as it's now Checking
WHERE id = ?
''', (
upgrading_from,
new_values.get('filled_by_file'),
new_values.get('filled_by_magnet'),
clean_version,
new_score, # Store the new score
datetime.now(),
'Checking', # State confirmed by caller
item['filled_by_torrent_id'], # Old torrent ID
upgrading_from_version, # Old version
item['id']
))
conn.commit()
logging.info(f"Updated item in database with new values (including score {new_score:.2f}) for {self.generate_identifier(item)}")
# Update the local item dictionary (important if used further)
item['upgrading_from'] = upgrading_from
item['filled_by_file'] = new_values.get('filled_by_file')
item['filled_by_magnet'] = new_values.get('filled_by_magnet')
item['upgrading_from_torrent_id'] = item.get('filled_by_torrent_id') # Store old ID
item['version'] = clean_version
item['current_score'] = new_score # Update local score
item['last_updated'] = datetime.now()
item['state'] = 'Checking'
item['upgrading'] = 0 # Sync with DB
# Send notification logic
try:
# Import dynamically to avoid circular dependencies at module level if any
from routes.notifications import get_enabled_notifications, send_notifications
enabled_notifications = get_enabled_notifications()
if enabled_notifications:
# Prepare data for the notification service
notification_data = [{
'title': item['title'],
'year': item.get('year'),
'version': item['version'], # Use the new version
'type': item['type'],