diff --git a/tts_mem.py b/tts_mem.py index 12699b31..acc134c0 100644 --- a/tts_mem.py +++ b/tts_mem.py @@ -1,42 +1,132 @@ -import subprocess -import threading -import signal -import sys +""" +AllTalk Multi Engine Manager (MEM) + +This script is part of the AllTalk project, written by Erew123. +GitHub Repository: https://github.com/erew123/alltalk_tts + +AllTalk MEM (Multi Engine Manager) is a research tool designed to manage and test +multiple instances of different Text-to-Speech (TTS) engines being loaded simultaneously, +with a view to a centralized engine being able to handle multiple requests simultaneously. + +Attribution: +- Original AllTalk project by Erew123 (https://github.com/erew123) +- MEM extension developed based on the AllTalk framework + +Licensing: +This project is subject to the licensing terms specified in the AllTalk repository. +Please refer to https://github.com/erew123/alltalk_tts for the most up-to-date +licensing information. + +Code Usage and Modifications: +Any code snippets or algorithms adapted from the original AllTalk project are used +with permission and in accordance with the project's licensing terms. Modifications +and extensions for MEM functionality have been implemented while maintaining +compatibility with the original project's structure and intent. + +Note to developers: +When using or adapting code from this script, please maintain the attribution +and adhere to the licensing terms of the original AllTalk project. For any +substantial use or modification, it's recommended to reference the original +project and this MEM extension. + +MEM is not intended for production use at this time and there is NO support +being offered on MEM. + +For the latest updates and documentation, please visit: +https://github.com/erew123/alltalk_tts +""" + import os -from pathlib import Path -import gradio as gr +import io +import sys import time -import requests -from requests.exceptions import RequestException -import socket import json import queue -from flask import Flask, request, jsonify, send_from_directory +import socket +import random +import signal +import asyncio +import aiohttp +import requests +import importlib +import threading +import subprocess +import gradio as gr +from pathlib import Path +from gradio import routes +from collections import deque +from datetime import datetime, timedelta +from requests.exceptions import RequestException from werkzeug.serving import make_server, WSGIRequestHandler +from concurrent.futures import ThreadPoolExecutor -# Add these at the beginning of your file, after other imports +def ensure_flask_dependencies(): + # Check for Flask async support + try: + import flask.json + if not hasattr(flask.json, 'loads'): + raise ImportError("Flask async support not found") + except ImportError: + print("Flask async support not found. Installing...") + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", "flask[async]"]) + print("Flask async support installed successfully.") + importlib.reload(flask) + except subprocess.CalledProcessError as e: + print(f"Failed to install Flask async support. Error: {e}") + print("Please run 'pip install flask[async]' manually.") + sys.exit(1) + # Check for Flask-CORS + try: + import flask_cors + except ImportError: + print("Flask-CORS not found. Installing...") + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", "flask-cors"]) + print("Flask-CORS installed successfully.") + except subprocess.CalledProcessError as e: + print(f"Failed to install Flask-CORS. Error: {e}") + print("Please run 'pip install flask-cors' manually.") + sys.exit(1) + +# Check Flask Dependencies +ensure_flask_dependencies() + +# Flask app setup +from flask_cors import CORS, cross_origin +from flask import Flask, request, jsonify, send_from_directory, send_file + +# Setup config file CONFIG_FILE = "mem_config.json" # Dictionary to store all processes processes = {} script_path = "tts_server.py" +# Global variables +queue_items = deque() +engine_statuses = {} + # Global flag to indicate if the program should exit should_exit = threading.Event() -def signal_handler(signum, frame): - print("[AllTalk MEM] Interrupt received, shutting down...") - should_exit.set() +# Global variable to hold the monitor thread +stop_monitor = threading.Event() +monitor = None + +flask_app = Flask(__name__) +CORS(flask_app) +flask_app.config['OUTPUT_FOLDER'] = str(Path(os.getcwd()) / 'outputs') # Default configuration default_config = { "base_port": 7001, - "api_server_port": 7401, + "api_server_port": 7851, "auto_start_engines": 0, "max_instances": 8, "gradio_interface_port": 7500, - "max_retries": 8, - "initial_wait": 3, + "max_retries": 12, + "initial_wait": 2, "backoff_factor": 1.2, "debug_mode": False, "max_queue_time": 60, # Maximum time a request can wait in the queue (in seconds) @@ -70,6 +160,28 @@ def save_config(config): current_base_port = config['base_port'] max_instances = config['max_instances'] +def signal_handler(signum, frame): + print("[AllTalk MEM] Interrupt received, shutting down...") + should_exit.set() + +########################## +### Live Queue Monitor ### +########################## +class QueueItem: + def __init__(self, text, start_time): + self.text = text + self.start_time = start_time + +class EngineStatus: + def __init__(self): + self.current_request = None + self.start_time = None + self.char_count = 0 + +class MonitoringControl: + def __init__(self): + self.stop_event = asyncio.Event() + ############################################ # START-UP # Display initial splash screen # ############################################ @@ -78,8 +190,11 @@ def save_config(config): print(f"[AllTalk MEM]\033[94m / _ \ | | |\033[1;35m | |/ _` | | |/ / \033[0m | | | | \___ \ ") print(f"[AllTalk MEM]\033[94m / ___ \| | |\033[1;35m | | (_| | | < \033[0m | | | | ___) |") print(f"[AllTalk MEM]\033[94m /_/ \_\_|_|\033[1;35m |_|\__,_|_|_|\_\ \033[0m |_| |_| |____/ ") -print(f"[AllTalk MEM]") -print(f"[AllTalk MEM] \033[94m AllTalk Multi Engine Manager\033[00m") +print(f"[AllTalk MEM]\033[94m __ __ \033[1;35m _____ __ __\033[0m") +print(f"[AllTalk MEM]\033[94m | \/ |\033[1;35m| ____| | \/ |\033[0m") +print(f"[AllTalk MEM]\033[94m | |\/| |\033[1;35m| _| | |\/| |\033[0m") +print(f"[AllTalk MEM]\033[94m | | | |\033[1;35m| |___ | | | |\033[0m") +print(f"[AllTalk MEM]\033[94m |_| |_|\033[1;35m|_____| |_| |_|\033[0m") print(f"[AllTalk MEM]") print(f"[AllTalk MEM] \033[93m MEM is not intended for production use and\033[00m") print(f"[AllTalk MEM] \033[93m there is NO support being offered on MEM\033[00m") @@ -154,48 +269,30 @@ def retry_with_backoff(func): while retries < config['max_retries']: try: result = func() - # print(f"[AllTalk MEM] Operation successful on attempt {retries + 1}") + print(f"[AllTalk MEM] Operation successful on attempt {retries + 1}") if config['debug_mode'] else None return result except RequestException as e: retries += 1 - # print(f"[AllTalk MEM] Attempt {retries}") + print(f"[AllTalk MEM] Attempt {retries}") if config['debug_mode'] else None if retries == config['max_retries']: - # print(f"[AllTalk MEM] All {max_retries} attempts failed waiting for this instance of the Engine to load.") + print(f"[AllTalk MEM] All {max_retries} attempts failed waiting for this instance of the Engine to load.") if config['debug_mode'] else None return False - # print(f"[AllTalk MEM] Retrying in {wait_time} seconds...") + print(f"[AllTalk MEM] Retrying in {wait_time} seconds...") if config['debug_mode'] else None time.sleep(wait_time) wait_time = round(wait_time * config['backoff_factor'], 1) return False -def is_server_ready(port, timeout=5): +def is_server_ready(port, timeout=3): def check_ready(): response = requests.get(f"http://127.0.0.1:{port}/api/ready", timeout=timeout) if response.text.strip() == "Ready": return True raise RequestException("Server not ready") - return retry_with_backoff(check_ready) -def update_engine_selector(): - running_engines = [] - debug_info = [] - for i, p in processes.items(): - if p.poll() is None: - port = current_base_port + i - 1 - ready = is_server_ready(port) - status = 'Ready' if ready else 'Not Ready' - debug_info.append(f"Engine {i} (Port {port}): {status}") - if ready: - running_engines.append(f"Engine {i} (Port {port})") - - debug_str = "\n".join(debug_info) - print(f"Debug info: {debug_str}") # Add this line for console debugging - return gr.update(choices=running_engines, value=running_engines[0] if running_engines else None), debug_str - def stop_all_instances(): instance_ids = list(processes.keys()) results = ["❌ Not running"] * max_instances - print(f"[AllTalk MEM] Terminating All TTS Engines") for instance_id in instance_ids: stop_subprocess(instance_id) results[instance_id-1] = "❌ Not running" @@ -203,25 +300,11 @@ def stop_all_instances(): def update_all_statuses(base_port): return [check_subprocess_status(i, base_port + i - 1) for i in range(1, max_instances + 1)] - def count_running_instances(): return sum(1 for p in processes.values() if p.poll() is None) -def get_available_voices(port, max_retries=3, delay=2): - for _ in range(max_retries): - try: - response = requests.get(f"http://127.0.0.1:{port}/api/voices", timeout=5) - data = response.json() - if data["status"] == "success": - return data["voices"] - except requests.RequestException as e: - print(f"Error fetching voices from port {port}: {str(e)}") - except json.JSONDecodeError: - print(f"Error decoding JSON from port {port}") - time.sleep(delay) - return [] - +# Gradio interface TTS Test (not anything to do with client requests) def generate_tts(port, engine_num, text, voice): try: response = requests.post(f"http://127.0.0.1:{port}/api/tts-generate", data={ @@ -262,8 +345,12 @@ def test_tts(engine, voice, text): except Exception as e: return None, f"Error in test_tts: {str(e)}" +############################# +# Gradio Code and functions # +############################# def create_gradio_interface(): - global current_base_port, max_instances + global current_base_port, max_instances, config, monitor + monitor = MonitoringControl() with gr.Blocks(title="AllTalk Multi Engine Manager", theme=gr.themes.Base()) as interface: with gr.Row(): gr.Markdown("# AllTalk Multi Engine Manager") @@ -341,32 +428,12 @@ def stop_all_and_update(): results = stop_all_instances() return [gr.update(value=1)] + results - stop_all_button.click(stop_all_and_update, outputs=[num_instances_input] + instance_statuses) - - def start_MEMple_instances(num_instances, base_port): - currently_running = count_running_instances() - - if currently_running >= num_instances: - return update_all_statuses(base_port) # All requested instances are already running - - instances_to_start = num_instances - currently_running - start_port, available_ports = find_available_port(base_port + currently_running, instances_to_start) - - if len(available_ports) < instances_to_start: - return ["Not enough available ports"] * max_instances - - results = update_all_statuses(base_port) - for i, port in enumerate(available_ports[:instances_to_start], start=currently_running+1): - result = start_subprocess(i, port) - results[i-1] = result - time.sleep(10) # Increase wait time to 10 seconds + stop_all_button.click(stop_all_and_update, outputs=[num_instances_input] + instance_statuses) - return results - - gr.Markdown("## Test TTS Engines") + with gr.Tab("Engines TTS Test"): with gr.Row(): - engine_selector = gr.Dropdown(label="Select TTS Engine", choices=[], scale=1) - voice_selector = gr.Dropdown(label="Select Voice", choices=[], scale=1) + engine_selector = gr.Dropdown(choices=["Please Start an Engine & Refresh List"], value="Please Start an Engine & Refresh List", label="Select TTS Engine", scale=1) + voice_selector = gr.Dropdown(choices=["Please Start an Engine & Refresh List"], value="Please Start an Engine & Refresh List", label="Select Voice", scale=1) test_text = gr.Textbox(label="Enter text to synthesize", value="This is a test of TTS engine number {engine} on port {port}, using the {voice} voice.", scale=2) with gr.Row(): @@ -380,43 +447,39 @@ def start_MEMple_instances(num_instances, base_port): def update_engine_selector(): running_engines = [] debug_info = [] - # Create a list of keys to iterate over - process_keys = list(processes.keys()) - for i in process_keys: - p = processes.get(i) - if p and p.poll() is None: - port = current_base_port + i - 1 - ready = is_server_ready(port) - status = 'Ready' if ready else 'Not Ready' - debug_info.append(f"Engine {i} (Port {port}): {status}") - if ready: - running_engines.append(f"Engine {i} (Port {port})") + for i, info in tts_instances.items(): + if is_instance_active(i): + running_engines.append(f"Engine {i} (Port {info['port']})") + status = 'Ready' + else: + status = 'Not Ready' + debug_info.append(f"Engine {i} (Port {info['port']}): {status}") debug_str = "\n".join(debug_info) - #print(f"Debug info: {debug_str}") # un hash # line for console debugging return gr.update(choices=running_engines, value=running_engines[0] if running_engines else None), debug_str - - def update_voice_selector(engine): + + async def update_voice_selector(engine): if engine: engine_num = int(engine.split()[1]) port = current_base_port + engine_num - 1 - voices = get_available_voices(port) + voices = await get_available_voices(port) debug_str = f"Fetched voices for Engine {engine_num} (Port {port}): {voices}" return gr.update(choices=voices, value=voices[0] if voices else None), debug_str return gr.update(choices=[], value=None), "No engine selected" - - # Update engine selector when instances are started/stopped - start_MEMple_button.click(update_engine_selector, outputs=[engine_selector, debug_output]) - stop_all_button.click(update_engine_selector, outputs=[engine_selector, debug_output]) - - # Add a manual refresh button for the engine selector - refresh_engine_button.click(update_engine_selector, outputs=[engine_selector, debug_output]) - # Update voice selector when engine is selected - engine_selector.change(update_voice_selector, inputs=[engine_selector], outputs=[voice_selector, debug_output]) - - # Generate TTS when test button is clicked - test_button.click(test_tts, inputs=[engine_selector, voice_selector, test_text], outputs=[audio_output, debug_output]) + refresh_engine_button.click( + update_engine_selector, + outputs=[engine_selector, debug_output] + ) + + engine_selector.change( + update_voice_selector, + inputs=[engine_selector], + outputs=[voice_selector, debug_output] + ) + + # Generate TTS when test button is clicked + test_button.click(test_tts, inputs=[engine_selector, voice_selector, test_text], outputs=[audio_output, debug_output]) with gr.Tab("MEM Settings"): with gr.Row(): @@ -479,10 +542,10 @@ def update_voice_selector(engine): choices=["Disabled", "Enabled"], value="Enabled" if config['debug_mode'] else "Disabled", label="Debug Mode" - ) + ) with gr.Row(): - with gr.Column(): + with gr.Column(scale=1): gr.Markdown("### 🟩 Core Queue Management") with gr.Row(): max_queue_time_input = gr.Number( @@ -495,17 +558,17 @@ def update_voice_selector(engine): label=f"Queue Check Interval (seconds) (Current: {config['queue_check_interval']})", step=0.1 ) + with gr.Column(scale=3): + gr.Markdown("### 🟩 TTS Request Dynamic Timeout Factors") + with gr.Row(): tts_request_timeout_input = gr.Number( value=config['tts_request_timeout'], label=f"TTS Request Timeout (seconds) (Current: {config['tts_request_timeout']})", step=1 - ) - with gr.Column(): - gr.Markdown("### 🟩 Dynamic Timeout Factors") - with gr.Row(): + ) text_length_factor_input = gr.Number( value=config['text_length_factor'], - label=f"Text Length Factor (Current: {config['text_length_factor']})", + label=f"Text Length Factor Per 100 chrs (Current: {config['text_length_factor']})", step=0.1 ) concurrent_request_factor_input = gr.Number( @@ -515,7 +578,7 @@ def update_voice_selector(engine): ) diminishing_factor_input = gr.Number( value=config['diminishing_factor'], - label=f"Diminishing Factor (Current: {config['diminishing_factor']})", + label=f"Diminishing Factor Calc (Current: {config['diminishing_factor']})", step=0.1 ) queue_position_factor_input = gr.Number( @@ -644,7 +707,7 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port Port for the Gradio web interface. MEM binds to 0.0.0.0 on this port. ### 🟨 API Server Port - **Default: 7401**
+ **Default: 7851**
Port for incoming TTS requests to the MEM API. Ensure it's free and not firewalled. """) with gr.Tab("Engine Start-up Time Contol"): @@ -668,7 +731,8 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port with gr.Tab("Debug Settings"): with gr.Row(): with gr.Column(): - gr.Markdown(""" + gr.Markdown(""" + ### 🟪 Debug Settings **Default: Disabled**
When enabled, MEM outputs additional diagnostic information. Useful for troubleshooting but increases console output. @@ -681,22 +745,44 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port ### 🟩 Max Queue Time **Default: 60 seconds**
- Maximum time a request can wait before being processed or rejected. Prevents indefinite waiting. + - This is the maximum time a request can spend waiting in the queue before being sent to a TTS engine.
+ - Once this time is exceeded, the request is removed from the queue and considered failed if it hasn't started processing by a TTS engine.
+ - Dynamic Timeout Factors come into play after a request has left the queue and is being processed by a TTS engine and they determine how long the system will wait for the TTS generation to complete.
+ - Total allowed processing time can be from when a request enters the system to when it completes (or fails) and is the sum of:
+             a) Time spent in the queue (limited by Max Queue Time)
+             b) Time spent in TTS generation (governed by Dynamic Timeout Factors) + - A request's "queue time" stops when it's sent to a TTS engine. + - The dynamic timeout for TTS generation is separate from and additional to the queue time, hence the total time a request spends in the system can indeed exceed the Max Queue Time. ### 🟩 Queue Check Interval **Default: 0.1 seconds**
Frequency of checking for available TTS engines. Lower values increase responsiveness but may increase CPU usage. - ### 🟩 TTS Request Timeout - **Default: 30 seconds**
- Base maximum processing time for a single TTS request. This value serves as the foundation for the Dynamic Timeout Factors. The actual timeout for each request is calculated by applying the Dynamic Timeout Factors to this base value. For a detailed explanation of how this base timeout is adjusted, please refer to the "Dynamic Timeout Factors" section in the settings help. - - These settings work together to balance system responsiveness and resource usage. Adjust Max Queue Time for overall request lifespan, Queue Check Interval for system reactivity, and TTS Request Timeout as a baseline for individual request processing. The TTS Request Timeout is particularly important as it forms the basis for all dynamic timeout calculations. + These settings work together to balance system responsiveness and resource usage. Adjust Max Queue Time for overall request lifespan, Queue Check Interval for system reactivity. """) - with gr.Tab("Dynamic Timeout Factors"): + with gr.Tab("TTS Request Dynamic Timeout Factors"): with gr.Column(): gr.Markdown(""" - These advanced settings allow adaptive adjustment of request timeouts based on various factors such as text length, system load, and queue position. They all work to modify the base TTS Request Timeout, creating a flexible timeout system that can adapt to changing conditions and ensure fair processing of requests. + These advanced settings allow adaptive adjustment of request timeouts based on various factors such as text length, system load, and queue position. They all work to modify the base TTS Request Timeout, creating a flexible timeout system that can adapt to changing conditions and ensure fair processing of requests, specifically designed to manage the waiting time for TTS generation after a request has been dispatched from the queue to an available TTS engine. + + When They Apply:
+ - These factors come into play after a request has been taken from the queue and sent to a TTS engine for processing. They dynamically adjust the timeout period for each individual TTS generation request based on various factors (Text length, Number of concurrent requests, Position in the queue, Time already spent processing). + + How They Work:
+ - The system calculates a custom timeout for each request using these factors.
+ - This timeout determines how long the system will wait for the TTS engine to complete the generation before considering it a failed request.
+ + Benefit:
+ - Allows longer wait times for more complex requests (e.g., longer text)
+ - Prevents indefinite waiting by setting a maximum timeout
+ - Adapts to system load and request complexity
+ + Key Point:
+ - These factors do not affect how long a request waits in the queue. They only apply to the actual TTS generation process after the request has been sent to an engine. + + ### 🟩 TTS Request Timeout + **Default: 30 seconds**
+ Base maximum processing time for a single TTS request. This value serves as the foundation for the Dynamic Timeout Factors. The actual timeout for each request is calculated by applying the Dynamic Timeout Factors to this base value. For a detailed explanation of how this base timeout is adjusted, please refer to the "Dynamic Timeout Factors" section in the settings help. ### 🟩 Text Length Factor **Default: 0.2**
@@ -736,8 +822,195 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port Final timeout would be 108 seconds, unless this exceeds the Max Queue Time, in which case Max Queue Time would be used instead. This dynamic system allows for flexible handling of various request scenarios while still respecting overall system limits. - """) - + """) + + with gr.Tab("MEM Queue Monitor"): + gr.Markdown("## Queue and Engine Status") + + with gr.Row(): + queue_length = gr.Textbox(label="Total Queue Length", value="0") + running_engines = gr.Textbox(label="Running Engines", value="0") + start_button = gr.Button("Start Monitoring") + stop_button = gr.Button("Stop Monitoring", interactive=False) + + with gr.Row(): + engine_status = gr.Dataframe( + headers=["Engine", "Port", "Status", "Processing Time", "Char Count", "Text"], + label="Engine Status", + wrap=True, + column_widths=["100px", "70px", "100px", "150px", "100px", "300px"], + height=500 + ) + + with gr.Row(): + queue_status = gr.Dataframe( + headers=["Position", "Wait Time", "Timeout", "Text"], + label="Queue Status (Top 10 items)", + wrap=True, + column_widths=["70px", "100px", "100px", "400px"], + height=500 + ) + + is_monitoring = gr.State(value=False) + async def update_monitor_data(is_monitoring): + global monitor + if not is_monitoring or (monitor and monitor.stop_event.is_set()): + return [None] * 4 # Return 4 None values instead of 3 + + try: + queue_size = len(queue_items) + running_engine_count = sum(1 for p in processes.values() if p.poll() is None) + + queue_data = [] + for i, item in enumerate(list(queue_items)[:10]): + wait_time = datetime.now() - item.start_time + timeout = timedelta(seconds=config['max_queue_time']) - wait_time + queue_data.append([ + i+1, + str(wait_time).split(".")[0], + str(timeout).split(".")[0] if timeout > timedelta() else "Timed Out", + item.text[:80] + "..." if len(item.text) > 80 else item.text + ]) + + engine_data = [] + for instance, info in tts_instances.items(): + engine_status = engine_statuses.get(instance, EngineStatus()) + if instance in processes and processes[instance].poll() is None: + if info["locked"] and engine_status.current_request: + processing_time = datetime.now() - engine_status.start_time + status = "Busy" + proc_time = str(processing_time).split('.')[0] + char_count = engine_status.char_count + text = engine_status.current_request[:50] + "..." if len(engine_status.current_request) > 50 else engine_status.current_request + else: + status = "Idle" + proc_time = "" + char_count = "" + text = "" + else: + status = "Not Started" + proc_time = "" + char_count = "" + text = "" + + engine_data.append([ + f"Engine {instance}", + info['port'], + status, + proc_time, + char_count, + text + ]) + + await asyncio.sleep(0) # Yield control to allow cancellation + return str(queue_size), str(running_engine_count), queue_data, engine_data + except Exception as e: + print(f"Error in update_monitor_data: {e}") + return [None] * 4 # Return 4 None values instead of 3 + + def start_monitoring(): + if monitor: + monitor.stop_event.clear() + return True, gr.update(interactive=False), gr.update(interactive=True) + + def stop_monitoring(): + if monitor: + monitor.stop_event.set() + return False, gr.update(interactive=True), gr.update(interactive=False) + + start_button.click( + start_monitoring, + outputs=[is_monitoring, start_button, stop_button] + ).then( + update_monitor_data, + inputs=[is_monitoring], + outputs=[queue_length, running_engines, queue_status, engine_status] + ) + + stop_button.click( + stop_monitoring, + outputs=[is_monitoring, start_button, stop_button] + ).then( + lambda: ("0", "0", [], []), + outputs=[queue_length, running_engines, queue_status, engine_status] + ) + + # Auto-refresh every 1 seconds when monitoring is active + interface.load( + update_monitor_data, + inputs=[is_monitoring], + outputs=[queue_length, running_engines, queue_status, engine_status], + every=1, + show_progress=False + ) + + interface.monitor = monitor # Attach the MonitoringApp instance to the interface + routes.App.stop_event = monitor.stop_event + + with gr.Tab("MEM Load Test"): + gr.Markdown("## MEM Load Test") + + with gr.Row(): + num_requests = gr.Slider(minimum=1, maximum=20, value=3, step=1, label="Number of Requests", scale=2) + text_length = gr.Slider(minimum=10, maximum=1000, value=100, step=10, label="Approximate Text Length (chars)", scale=2) + voice_dropdown = gr.Dropdown(choices=["Please Start an Engine & Refresh List"], value="Please Start an Engine & Refresh List", label="Voice", scale=2) + refresh_voices_button = gr.Button("Refresh Voices", scale=1) + start_test_button = gr.Button("Start Load Test", scale=1) + + with gr.Row(): + summary_text = gr.Textbox(label="Summary", interactive=False, lines=4) + note_text = gr.Textbox( + value=( + "All requests are sent simultaneously. The 'completed in' time is calculated from when the requests were sent, " + "not the actual TTS processing time per individual request within the TTS engine. This test measures overall " + "system responsiveness, including queuing and load balancing, not just TTS generation speed." + ), + label="Important Note", + lines=4, + interactive=False, + ) + + results_table = gr.Dataframe( + headers=["Request", "Time (s)", "Status", "Audio URL"], + label="Load Test Results", + row_count=(1, "dynamic"), + col_count=(4, "fixed"), + interactive=False, + datatype=["str", "number", "str", "str"] + ) + + async def refresh_voices(): + voices = await fetch_voices() + return gr.update(choices=voices, value=voices[0] if voices else None) + + async def start_load_test(num_requests, text_length, voice): + if not voice: + yield [], "Please select a voice before starting the load test.", gr.update() + return + + # Calculate the actual text length + base_text = "Testing MEM load balancing system. Request X. " + repeats = max(1, text_length // len(base_text)) + actual_text = base_text * repeats + actual_text_length = len(actual_text.replace("X", "1")) + + progress_message = f"Load Test running: {num_requests} TTS requests, each with approximately {actual_text_length} characters" + yield [], progress_message, gr.update() + + results, summary = await run_load_test_gradio(num_requests, actual_text, voice) + yield results, summary, gr.update() + + refresh_voices_button.click( + refresh_voices, + outputs=[voice_dropdown] + ) + + start_test_button.click( + start_load_test, + inputs=[num_requests, text_length, voice_dropdown], + outputs=[results_table, summary_text, voice_dropdown] + ) + with gr.Tab("MEM FAQ/Help"): with gr.Row(): with gr.Column(): @@ -746,9 +1019,9 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port AllTalk MEM (Multi Engine Manager) is a research tool designed to manage and test multiple instances of different Text-to-Speech (TTS) engines being loaded simultaneously, with a view to a centralised engine being able to handle multiple requests simultaneously. ## Current Status & Support - AllTalk MEM is currently in a demonstration, testing, and experimental phase. - - ⚠️ MEM is not intended for production use at this time and there is **NO support being offered on MEM**. ⚠️ + ⚠️ AllTalk MEM is currently in a demonstration, testing, and experimental phase. ⚠️
+ ⚠️ There will be bugs & issues. MEM is not intended for Production Environments. ⚠️
+ ⚠️ As such there is **NO support being offered on MEM**. ⚠️ ## System Capacity **Q: How many engines can my system (GPU/CPU) handle at once?**
@@ -758,7 +1031,7 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port **Q: Is there an in-built queue system to handle different requests to different loaded engines?**
A: MEM incorporates a built-in queue system to manage multiple TTS requests across loaded engine instances: - 1. All TTS requests are received through the API port (default: 7401). + 1. All TTS requests are received through the API port (default: 7851). 2. The queue system distributes incoming requests among available TTS engine instances. 3. If all engines are busy, new requests are held in a queue until an engine becomes available. 4. The system continuously checks for available engines to process waiting requests. @@ -766,19 +1039,23 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port This queue system aims to balance the load across all running engines efficiently. Advanced features for queue management and dynamic timeout calculations are available and can be configured in the MEM settings. - Note: As this is a research and testing implementation, its performance in high-load or production environments is not guaranteed. For load testing, you can use the provided `mem_load_test.py` script to simulate multiple simultaneous requests. - - To use the load testing tool: - `python mem_load_test.py --requests [number_of_requests] --length [text_length] --url "http://127.0.0.1:7501/api/tts-generate"` + Note: As this is a research and testing implementation, its performance in high-load or production environments is not guaranteed. For load testing, you can use the MEM Load Test to simulate multiple simultaneous requests. ## API Requests to each loaded engine **Q: Can I use the AllTalk API to each engine instance?**
A: Yes, each engine loaded will respond fully as if it was a standalone AllTalk instance. So you can use the standard API requests to the port number of each TTS engine loaded in. + > **Ready Endpoint -** http://{ipaddress}:{port}/api/ready
+ > **Voice Endpoint -** http://{ipaddress}:{port}/api/voices
+ > **RVC Voices Endpoint -** http://{ipaddress}:{port}/api/rvcvoices
+ > **Current Settings Endpoint -** http://{ipaddress}:{port}/api/currentsettings
+ > **TTS Generation Endpoint -** http://{ipaddress}:{port}/api/tts-generate
+ > **OpenAI Compatible Endpoint -** http://{ipaddress}:{port}/v1/audio/speech
+ ## What happens if it tries to load an engine thats on a port already in use **Q: When a engine instance starts up, what happens if a port is already used by something else?**
A: A test is performed of the port before starting each instance, so the engine trying to load on that port number wouldnt load in. - + """) with gr.Column(): gr.Markdown(""" @@ -789,7 +1066,7 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port CUDA has additional code and tools to deal with multiplexing requests and none of these have been implimented. So for example, if you are using a TTS engine that uses a GPU, and 2x different loaded TTS engines make a request of that hardware, you may get a 50/50 spit use of the hardware and you may not. 3x requests, you may get a 33/33/33 hardware load sharing sceanario nad you may not. I cannot say at the moment and havnt tested. Python - As each instance of each TTS engine will be loaded into a seperate Python instance, all memory management and requests should remain segregated from one another, so there should be no bleed over of tensors in CUDA requests etc, however, I have not tested. - + ## What settings are used for the TTS engines loaded **Q: What exact settings are being used by the TTS engines being loaded in?**
A: Whatever you have set in the main AllTalk Gradio interface when you load AllTalk as a standalone, under the `TTS Engine Settings` and the specific TTS engine, those are the settings loaded/used. @@ -813,13 +1090,8 @@ def update_settings(new_port, new_max_instances, new_auto_start, new_gradio_port - Developing a basic API management suite for MEM
Please note that these are potential plans and not guaranteed features. - - ## ⚠️ Support for MEM ⚠️ - - ⚠️ MEM is not intended for production use at this time and there is **NO support being offered on MEM**. ⚠️ - - ⚠️ Yes, I know there will be bugs. Yes I know there will be issues. This is not designed for Production Environments. ⚠️ - """) + """) + return interface def start_and_update(instance_id, port): @@ -860,10 +1132,9 @@ def start_MEMple_instances(num_instances, base_port): return results -############### -## WEBSERVER ## -############### - +######################## +## WEBSERVER # General # +######################## app = Flask(__name__) # Queue for holding TTS requests @@ -875,9 +1146,6 @@ def start_MEMple_instances(num_instances, base_port): # Dictionary to keep track of TTS instance status tts_instances = {} -# Maximum wait time for a request (in seconds) -MAX_WAIT_TIME = 60 - def initialize_instances(): global tts_instances with lock: @@ -924,43 +1192,55 @@ def calculate_dynamic_timeout(text, concurrent_requests, queue_position, total_q time_in_process = time.time() - request_start_time diminishing_factor = max(0, 1 - (time_in_process / base_timeout)) - timeout_with_diminishing = adjusted_timeout * (1 + (diminishing_factor * 0.5)) + timeout_with_diminishing = adjusted_timeout * (1 + (diminishing_factor * config['diminishing_factor'])) queue_position_factor = queue_position / total_queue_length final_timeout = timeout_with_diminishing + (config['tts_request_timeout'] * queue_position_factor) return min(final_timeout, config['max_queue_time']) # Ensure we don't exceed max queue time -@app.route('/api/tts-generate', methods=['POST']) +######################################## +## WEBSERVER # Client Facing Endpoints # +######################################## +@flask_app.route('/api/tts-generate', methods=['POST']) +@cross_origin(origins='*', methods=['POST', 'OPTIONS'], allow_headers=['Content-Type']) def tts_generate(): print("[AllTalk MEM] Received TTS generate request") if config['debug_mode'] else None - start_time = time.time() + start_time = datetime.now() request_data = request.form.to_dict() # Add request to queue - request_queue.put(request_data) - queue_position = request_queue.qsize() - - while time.time() - start_time < config['max_queue_time']: + queue_item = QueueItem(request_data['text_input'], start_time) + queue_items.append(queue_item) + + while (datetime.now() - start_time).total_seconds() < config['max_queue_time']: instance = get_available_instance() if instance: try: print(f"[AllTalk MEM] Processing request with instance {instance}") if config['debug_mode'] else None - # Calculate dynamic timeout just before processing + # Update engine status + engine_statuses[instance] = EngineStatus() + engine_statuses[instance].current_request = request_data['text_input'] + engine_statuses[instance].start_time = datetime.now() + engine_statuses[instance].char_count = len(request_data['text_input']) + dynamic_timeout = calculate_dynamic_timeout( text=request_data['text_input'], concurrent_requests=len([i for i in tts_instances.values() if i['locked']]), - queue_position=queue_position, - total_queue_length=request_queue.qsize(), - request_start_time=start_time + queue_position=len(queue_items), + total_queue_length=len(queue_items), + request_start_time=start_time.timestamp() ) result = process_tts_request(instance, request_data, dynamic_timeout) print(f"[AllTalk MEM] Request processed, result: {result}") if config['debug_mode'] else None # Remove request from queue - request_queue.get() + queue_items.popleft() + + # Reset engine status + engine_statuses[instance] = EngineStatus() return jsonify(result) except Exception as e: @@ -970,10 +1250,10 @@ def tts_generate(): time.sleep(config['queue_check_interval']) # Remove request from queue if it times out - request_queue.get() + queue_items.popleft() - print("[AllTalk MEM] No instance available within timeout period") - return jsonify({"status": "error", "message": "No TTS instance available within the maximum wait time"}) + print("[AllTalk MEM] No instance available within queue timeout period") + return jsonify({"status": "error", "message": "No TTS instance available within the queue maximum wait time"}) def process_tts_request(instance, data, timeout): port = tts_instances[instance]["port"] @@ -985,13 +1265,237 @@ def process_tts_request(instance, data, timeout): finally: release_instance(instance) -@app.route('/audio/') +@flask_app.route('/v1/audio/speech', methods=['POST']) +@cross_origin(origins='*', methods=['POST', 'OPTIONS'], allow_headers=['Content-Type']) +def openai_speech(): + print("[AllTalk MEM] Received OpenAI-style TTS generate request") if config['debug_mode'] else None + start_time = datetime.now() + request_data = request.json + + # Add request to queue + queue_item = QueueItem(request_data['input'], start_time) + queue_items.append(queue_item) + + while (datetime.now() - start_time).total_seconds() < config['max_queue_time']: + instance = get_available_instance() + if instance: + try: + print(f"[AllTalk MEM] Processing OpenAI-style request with instance {instance}") if config['debug_mode'] else None + + # Update engine status + engine_statuses[instance] = EngineStatus() + engine_statuses[instance].current_request = request_data['input'] + engine_statuses[instance].start_time = datetime.now() + engine_statuses[instance].char_count = len(request_data['input']) + + dynamic_timeout = calculate_dynamic_timeout( + text=request_data['input'], + concurrent_requests=len([i for i in tts_instances.values() if i['locked']]), + queue_position=len(queue_items), + total_queue_length=len(queue_items), + request_start_time=start_time.timestamp() + ) + + result = process_openai_tts_request(instance, request_data, dynamic_timeout) + print(f"[AllTalk MEM] OpenAI-style request processed, result: {result}") if config['debug_mode'] else None + + # Remove request from queue + queue_items.popleft() + + # Reset engine status + engine_statuses[instance] = EngineStatus() + + if isinstance(result, requests.Response) and result.status_code == 200: + return send_file( + io.BytesIO(result.content), + mimetype=f'audio/{request_data.get("response_format", "mp3")}', + as_attachment=True, + download_name=f"speech.{request_data.get('response_format', 'mp3')}" + ) + else: + return jsonify({"error": "Failed to generate speech"}), 500 + + except Exception as e: + print(f"[AllTalk MEM] Error processing OpenAI-style request: {str(e)}") + release_instance(instance) + return jsonify({"error": str(e)}), 500 + + time.sleep(config['queue_check_interval']) + + # Remove request from queue if it times out + queue_items.popleft() + + print("[AllTalk MEM] No instance available within queue timeout period for OpenAI-style request") + return jsonify({"error": "No TTS instance available within the queue maximum wait time"}), 504 + +def process_openai_tts_request(instance, data, timeout): + port = tts_instances[instance]["port"] + try: + response = requests.post(f"http://127.0.0.1:{port}/v1/audio/speech", json=data, timeout=timeout) + return response + except Exception as e: + return {"error": str(e)} + finally: + release_instance(instance) + +@flask_app.route('/audio/') def serve_audio(filename): - return send_from_directory(app.config['OUTPUT_FOLDER'], filename) + return send_from_directory(flask_app.config['OUTPUT_FOLDER'], filename) + +@flask_app.route('/api/ready', methods=['GET']) +async def relay_ready_status(): + status = await fetch_from_any_engine('/api/ready') + return status, 200 if status == "Ready" else 503 + +@flask_app.route('/api/voices', methods=['GET']) +async def relay_voices(): + voices = await fetch_from_any_engine('/api/voices') + return jsonify(voices) + +@flask_app.route('/api/rvcvoices', methods=['GET']) +async def relay_rvc_voices(): + rvc_voices = await fetch_from_any_engine('/api/rvcvoices') + return jsonify(rvc_voices) + +@flask_app.route('/api/currentsettings', methods=['GET']) +async def relay_current_settings(): + settings = await fetch_from_any_engine('/api/currentsettings') + return jsonify(settings) + +async def fetch_from_any_engine(endpoint): + for instance, info in tts_instances.items(): + if is_instance_active(instance): + port = info['port'] + url = f"http://127.0.0.1:{port}{endpoint}" + async with aiohttp.ClientSession() as session: + try: + async with session.get(url) as response: + if response.status == 200: + if endpoint == '/api/ready': + return await response.text() + else: + return await response.json() + except aiohttp.ClientError: + continue # Try the next engine if this one fails + return {"status": "error", "message": "No active TTS engine available"} + +########################################## +## WEBSERVER # Gradio interface internal # +########################################## +async def fetch_voices(): + if not tts_instances: + return ["No Engines Loaded"] + + # Get the port of the first active engine + for info in tts_instances.values(): + port = info['port'] + url = f"http://127.0.0.1:{port}/api/voices" + async with aiohttp.ClientSession() as session: + try: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + voices = data.get('voices', []) + return voices if voices else ["No voices found"] + except aiohttp.ClientError: + continue # Try the next engine if this one fails + + return ["No response from engines"] + +async def get_available_voices(port): + url = f"http://127.0.0.1:{port}/api/voices" + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + if data["status"] == "success": + return data["voices"] + else: + print(f"Error fetching voices: {data.get('message', 'Unknown error')}") + else: + print(f"Error fetching voices: HTTP {response.status}") + except aiohttp.ClientError as e: + print(f"Network error while fetching voices: {str(e)}") + except Exception as e: + print(f"Unexpected error while fetching voices: {str(e)}") + + return [] # Return an empty list if there's any error -# Set the output folder to be 'outputs' in the current directory -app.config['OUTPUT_FOLDER'] = str(Path(os.getcwd()) / 'outputs') +async def update_voice_selector(engine): + if engine: + engine_num = int(engine.split()[1]) + port = current_base_port + engine_num - 1 + voices = await get_available_voices(port) + debug_str = f"Fetched voices for Engine {engine_num} (Port {port}): {voices}" + return gr.update(choices=voices, value=voices[0] if voices else None), debug_str + return gr.update(choices=[], value=None), "No engine selected" + +############################# +## WEBSERVER # Load Testing # +############################# +async def run_load_test_gradio(num_requests, base_text, voice): + api_url = f"http://127.0.0.1:{config['api_server_port']}/api/tts-generate" + start_time = time.time() + + async with aiohttp.ClientSession() as session: + tasks = [] + for i in range(num_requests): + text = base_text.replace("X", str(i+1)) + task = send_tts_request(session, api_url, text, voice, i+1) + tasks.append(task) + + responses = await asyncio.gather(*tasks) + + end_time = time.time() + total_time = end_time - start_time + + results = [] + for i, response in enumerate(responses): + if response and response.get("status") == "generate-success": + audio_url = f"http://127.0.0.1:{config['api_server_port']}{response['output_file_url']}" + # Round time to 1 decimal place + rounded_time = round(response['time'], 1) + results.append([f"Request {i+1}", rounded_time, "Success", audio_url]) + else: + results.append([f"Request {i+1}", None, "Failed", ""]) + successful_requests = sum(1 for r in results if r[2] == "Success") + failed_requests = num_requests - successful_requests + summary = f"Load test completed in {total_time:.1f} seconds.\n" + summary += f"Successful requests: {successful_requests}\n" + summary += f"Failed requests: {failed_requests}\n" + summary += f"Actual characters per request: {len(base_text)}" + + return results, summary + +async def send_tts_request(session, url, text, voice, request_id): + start_time = time.time() + try: + async with session.post(url, data={ + "text_input": text, + "text_filtering": "standard", + "character_voice_gen": voice, + "narrator_enabled": "false", + "narrator_voice_gen": "en_US-ljspeech-high.onnx", + "text_not_inside": "character", + "language": "en", + "output_file_name": f"loadtest_{request_id}", + "output_file_timestamp": "true", + "autoplay": "false", + "autoplay_volume": "0.8" + }) as response: + result = await response.json() + end_time = time.time() + result['time'] = end_time - start_time + return result + except Exception as e: + print(f"Request {request_id} failed: {str(e)}") + return None + +################################## +## WEBSERVER # Thread Management # +################################## class SilentWSGIRequestHandler(WSGIRequestHandler): def log_request(self, *args, **kwargs): pass @@ -1012,7 +1516,7 @@ def shutdown(self): def start_api_server(): global server_thread - server_thread = ServerThread(app) + server_thread = ServerThread(flask_app) # Use flask_app here server_thread.start() print(f"[AllTalk MEM] API server thread started on port {config['api_server_port']}") if config['debug_mode'] else None @@ -1024,18 +1528,38 @@ def stop_api_server(): server_thread.join() print("[AllTalk MEM] API server shut down successfully") +########################## +### Shutdown & Cleanup ### +########################## +def graceful_shutdown(): + print("[AllTalk MEM] Initiating graceful shutdown...") + # Stop all engine instances + print("[AllTalk MEM] Shutting down all engines...") + stop_all_instances() + print("[AllTalk MEM] All engines stopped.") + # Stop API server + stop_api_server() + # Set flags to stop ongoing processes + stop_monitor.set() + should_exit.set() + # Close Gradio interface + if 'interface' in globals(): + interface.close() + gr.close_all() + print("[AllTalk MEM] Graceful shutdown complete.") + ###################### ### INITIALISATION ### ###################### - print(f"[AllTalk MEM] Please use \033[91mCtrl+C\033[0m when exiting otherwise Python") print(f"[AllTalk MEM] subprocess's will continue running in the background.") print(f"[AllTalk MEM] ") +print(f"[AllTalk MEM] Ensure you have configured AllTalk TTS engines in the") +print(f"[AllTalk MEM] AllTalk interface before using MEM. MEM does not require") +print(f"[AllTalk MEM] AllTalk to be running and should be used seperately.") +print(f"[AllTalk MEM] ") print(f"[AllTalk MEM] MEM Server Ready") -def launch_interface(interface): - interface.launch(quiet=True, server_port=config['gradio_interface_port'], prevent_thread_lock=True) - def auto_start_engines(): num_engines = config['auto_start_engines'] if num_engines > 0: @@ -1043,40 +1567,27 @@ def auto_start_engines(): results = start_MEMple_instances(num_engines, current_base_port) update_tts_instances() print(f"[AllTalk MEM] Auto-start complete.") - print(f"[AllTalk MEM] {results}") - -def shutdown(): - print("[AllTalk MEM] Shutting down all engines...") - stop_all_instances() - print("[AllTalk MEM] All engines stopped.") + print(f"[AllTalk MEM] {results}") # Main execution if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) - interface = create_gradio_interface() - - # Start the Gradio interface in a separate thread - interface_thread = threading.Thread(target=launch_interface, args=(interface,)) - interface_thread.start() - + # Start the Gradio interface + interface.launch(quiet=True, server_port=config['gradio_interface_port'], prevent_thread_lock=True) # Start the API server start_api_server() - initialize_instances() auto_start_engines() - try: # Main loop while not should_exit.is_set(): - should_exit.wait(1) # Wait for 1 second or until should_exit is set + time.sleep(0.1) # Short sleep to prevent high CPU usage + except KeyboardInterrupt: + print("[AllTalk MEM] Keyboard interrupt received. Shutting down...") + should_exit.set() except Exception as e: print(f"[AllTalk MEM] An error occurred: {e}") finally: - print("[AllTalk MEM] Initiating shutdown...") - stop_api_server() - shutdown() - # Give the interface thread a chance to close gracefully - interface_thread.join(timeout=5) - print("[AllTalk MEM] Shutdown complete.") + graceful_shutdown() sys.exit(0) \ No newline at end of file