forked from FujiwaraChoki/MoneyPrinter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
329 lines (269 loc) · 11.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import os
from gpt import *
from video import *
from utils import *
from search import *
from uuid import uuid4
from tiktokvoice import *
from flask_cors import CORS
from termcolor import colored
from dotenv import load_dotenv
from youtube import upload_video
from apiclient.errors import HttpError
from flask import Flask, request, jsonify
from moviepy.config import change_settings
# Load environment variables
load_dotenv("../.env")
# Set environment variables
SESSION_ID = os.getenv("TIKTOK_SESSION_ID")
openai_api_key = os.getenv('OPENAI_API_KEY')
change_settings({"IMAGEMAGICK_BINARY": os.getenv("IMAGEMAGICK_BINARY")})
# Initialize Flask
app = Flask(__name__)
CORS(app)
# Constants
HOST = "0.0.0.0"
PORT = 8080
AMOUNT_OF_STOCK_VIDEOS = 5
GENERATING = False
# Generation Endpoint
@app.route("/api/generate", methods=["POST"])
def generate():
try:
# Set global variable
global GENERATING
GENERATING = True
# Clean
clean_dir("../temp/")
clean_dir("../subtitles/")
# Parse JSON
data = request.get_json()
paragraph_number = int(data.get('paragraphNumber', 1)) # Default to 1 if not provided
ai_model = data.get('aiModel') # Get the AI model selected by the user
# Get 'useMusic' from the request data and default to False if not provided
use_music = data.get('useMusic', False)
# Get 'automateYoutubeUpload' from the request data and default to False if not provided
automate_youtube_upload = data.get('automateYoutubeUpload', False)
# Get the ZIP Url of the songs
songs_zip_url = data.get('zipUrl')
# Download songs
if use_music:
# Downloads a ZIP file containing popular TikTok Songs
if songs_zip_url:
fetch_songs(songs_zip_url)
else:
# Default to a ZIP file containing popular TikTok Songs
fetch_songs("https://filebin.net/2avx134kdibc4c3q/drive-download-20240209T180019Z-001.zip")
# Print little information about the video which is to be generated
print(colored("[Video to be generated]", "blue"))
print(colored(" Subject: " + data["videoSubject"], "blue"))
print(colored(" AI Model: " + ai_model, "blue")) # Print the AI model being used
if not GENERATING:
return jsonify(
{
"status": "error",
"message": "Video generation was cancelled.",
"data": [],
}
)
voice = data["voice"]
if not voice:
print(colored("[!] No voice was selected. Defaulting to \"en_us_001\"", "yellow"))
voice = "en_us_001"
# Generate a script
script = generate_script(data["videoSubject"], paragraph_number, ai_model, voice) # Pass the AI model to the script generation
# Generate search terms
search_terms = get_search_terms(
data["videoSubject"], AMOUNT_OF_STOCK_VIDEOS, script, ai_model
)
# Search for a video of the given search term
video_urls = []
# Defines how many results it should query and search through
it = 15
# Defines the minimum duration of each clip
min_dur = 10
# Loop through all search terms,
# and search for a video of the given search term
for search_term in search_terms:
if not GENERATING:
return jsonify(
{
"status": "error",
"message": "Video generation was cancelled.",
"data": [],
}
)
found_urls = search_for_stock_videos(
search_term, os.getenv("PEXELS_API_KEY"), it, min_dur
)
# Check for duplicates
for url in found_urls:
if url not in video_urls:
video_urls.append(url)
break
# Define video_paths
video_paths = []
# Let user know
print(colored(f"[+] Downloading {len(video_urls)} videos...", "blue"))
# Save the videos
for video_url in video_urls:
if not GENERATING:
return jsonify(
{
"status": "error",
"message": "Video generation was cancelled.",
"data": [],
}
)
try:
saved_video_path = save_video(video_url)
video_paths.append(saved_video_path)
except Exception:
print(colored(f"[-] Could not download video: {video_url}", "red"))
# Let user know
print(colored("[+] Videos downloaded!", "green"))
# Let user know
print(colored("[+] Script generated!\n", "green"))
if not GENERATING:
return jsonify(
{
"status": "error",
"message": "Video generation was cancelled.",
"data": [],
}
)
# Split script into sentences
sentences = script.split(". ")
# Remove empty strings
sentences = list(filter(lambda x: x != "", sentences))
paths = []
# Generate TTS for every sentence
for sentence in sentences:
if not GENERATING:
return jsonify(
{
"status": "error",
"message": "Video generation was cancelled.",
"data": [],
}
)
current_tts_path = f"../temp/{uuid4()}.mp3"
tts(sentence, voice, filename=current_tts_path)
audio_clip = AudioFileClip(current_tts_path)
paths.append(audio_clip)
# Combine all TTS files using moviepy
final_audio = concatenate_audioclips(paths)
tts_path = f"../temp/{uuid4()}.mp3"
final_audio.write_audiofile(tts_path)
try:
subtitles_path = generate_subtitles(audio_path=tts_path, sentences=sentences, audio_clips=paths)
except Exception as e:
print(colored(f"[-] Error generating subtitles: {e}", "red"))
subtitles_path = None
# Concatenate videos
temp_audio = AudioFileClip(tts_path)
print(video_paths)
combined_video_path = combine_videos(video_paths, temp_audio.duration, 5)
# Put everything together
try:
final_video_path = generate_video(combined_video_path, tts_path, subtitles_path)
except Exception as e:
print(colored(f"[-] Error generating final video: {e}", "red"))
final_video_path = None
if automate_youtube_upload:
# Start Youtube Uploader
# Check if the CLIENT_SECRETS_FILE exists
client_secrets_file = os.path.abspath("./client_secret.json")
SKIP_YT_UPLOAD = False
if not os.path.exists(client_secrets_file):
SKIP_YT_UPLOAD = True
print(colored("[-] Client secrets file missing. YouTube upload will be skipped.", "yellow"))
print(colored("[-] Please download the client_secret.json from Google Cloud Platform and store this inside the /Backend directory.", "red"))
# Only proceed with YouTube upload if the toggle is True and client_secret.json exists.
if not SKIP_YT_UPLOAD:
# Define metadata for the video
title, description, keywords = generate_metadata(data["videoSubject"], script, ai_model)
print(colored("[-] Metadata for YouTube upload:", "blue"))
print(colored(" Title: ", "blue"))
print(colored(f" {title}", "blue"))
print(colored(" Description: ", "blue"))
print(colored(f" {description}", "blue"))
print(colored(" Keywords: ", "blue"))
print(colored(f" {', '.join(keywords)}", "blue"))
# Choose the appropriate category ID for your videos
video_category_id = "28" # Science & Technology
privacyStatus = "private" # "public", "private", "unlisted"
video_metadata = {
'video_path': os.path.abspath(f"../temp/{final_video_path}"),
'title': title,
'description': description,
'category': video_category_id,
'keywords': ",".join(keywords),
'privacyStatus': privacyStatus,
}
# Upload the video to YouTube
try:
# Unpack the video_metadata dictionary into individual arguments
video_response = upload_video(
video_path=video_metadata['video_path'],
title=video_metadata['title'],
description=video_metadata['description'],
category=video_metadata['category'],
keywords=video_metadata['keywords'],
privacy_status=video_metadata['privacyStatus']
)
print(f"Uploaded video ID: {video_response.get('id')}")
except HttpError as e:
print(f"An HTTP error {e.resp.status} occurred:\n{e.content}")
if use_music:
# Select a random song
song_path = choose_random_song()
# Add song to video at 30% volume using moviepy
video_clip = VideoFileClip(f"../temp/{final_video_path}")
original_duration = video_clip.duration
original_audio = video_clip.audio
song_clip = AudioFileClip(song_path).set_fps(44100)
# Set the volume of the song to 10% of the original volume
song_clip = song_clip.volumex(0.1).set_fps(44100)
# Add the song to the video
comp_audio = CompositeAudioClip([original_audio, song_clip])
video_clip = video_clip.set_audio(comp_audio)
video_clip = video_clip.set_fps(30)
video_clip = video_clip.set_duration(original_duration)
video_clip.write_videofile(f"../{final_video_path}", threads=2)
# Let user know
print(colored(f"[+] Video generated: {final_video_path}!", "green"))
# Stop FFMPEG processes
if os.name == "nt":
# Windows
os.system("taskkill /f /im ffmpeg.exe")
else:
# Other OS
os.system("pkill -f ffmpeg")
GENERATING = False
# Return JSON
return jsonify(
{
"status": "success",
"message": "Video generated! See MoneyPrinter/output.mp4 for result.",
"data": final_video_path,
}
)
except Exception as err:
print(colored(f"[-] Error: {str(err)}", "red"))
return jsonify(
{
"status": "error",
"message": f"Could not retrieve stock videos: {str(err)}",
"data": [],
}
)
@app.route("/api/cancel", methods=["POST"])
def cancel():
print(colored("[!] Received cancellation request...", "yellow"))
global GENERATING
GENERATING = False
return jsonify({"status": "success", "message": "Cancelled video generation."})
if __name__ == "__main__":
# Run Flask App
app.run(debug=True, host=HOST, port=PORT)