-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdownload_stream.py
96 lines (76 loc) · 3.31 KB
/
download_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import yt_dlp
import os
from datetime import datetime, timedelta
import json
# URL kanału
CHANNEL_URL = 'https://www.youtube.com/@PaszaTV/streams'
def check_if_downloaded(file_name):
download_info_file = 'download_info.json'
if not os.path.exists(download_info_file):
return False
with open(download_info_file, 'r') as f:
download_info = json.load(f)
if file_name in download_info:
last_download_time = datetime.fromisoformat(download_info[file_name])
if datetime.now() - last_download_time < timedelta(days=7):
return True
return False
def update_download_info(file_name):
download_info_file = 'download_info.json'
if os.path.exists(download_info_file):
with open(download_info_file, 'r') as f:
download_info = json.load(f)
else:
download_info = {}
download_info[file_name] = datetime.now().isoformat()
with open(download_info_file, 'w') as f:
json.dump(download_info, f)
def get_channel_name(channel_url):
ydl_opts = {
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(channel_url, download=False)
return info['channel']
def get_last_stream():
# Konfiguracja yt-dlp
ydl_opts = {
'format': 'bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/best[ext=mp4][height<=720]/best[ext=mp4]',
'outtmpl': '%(title)s-%(id)s.%(ext)s',
'playlistend': 1, # Pobierz tylko najnowszy stream
'merge_output_format': 'mp4', # Wymusza format wyjściowy MP4
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
# Pobierz informacje o streamach
info = ydl.extract_info(CHANNEL_URL, download=False)
if 'entries' in info:
# Wybierz ostatni zakończony stream
for entry in info['entries']:
if entry.get('live_status') == 'was_live':
file_name = f"{entry['title']}-{entry['id']}.mp4"
if not check_if_downloaded(file_name):
print(f"Znaleziono ostatni stream do pobrania: {entry['title']}")
ydl.download([entry['webpage_url']])
update_download_info(file_name)
# Pobierz nazwę kanału
channel_name = get_channel_name(CHANNEL_URL)
# Zmiana nazwy pliku na NazwaKanału-data
new_file_name = f"{channel_name}-{(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')}.mp4"
os.rename(file_name, new_file_name)
return new_file_name
else:
print(f"Stream {entry['title']} był już pobrany w ciągu ostatnich 7 dni. Pomijam.")
return None
print("Nie znaleziono odpowiedniego streamu.")
return None
except Exception as e:
print(f"Wystąpił błąd: {e}")
return None
if __name__ == "__main__":
downloaded_file = get_last_stream()
if downloaded_file:
print(f"Pomyślnie pobrano plik: {downloaded_file}")
else:
print("Nie udało się pobrać nowego streamu lub stream był już pobrany.")