diff --git "a/Poincar\303\251.wav" "b/Poincar\303\251.wav" new file mode 100644 index 0000000..4dea245 Binary files /dev/null and "b/Poincar\303\251.wav" differ diff --git a/lab.ipynb b/lab.ipynb new file mode 100644 index 0000000..69e20c6 --- /dev/null +++ b/lab.ipynb @@ -0,0 +1,132 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "from TTS.api import TTS\n", + "from utils import *\n", + "from pydub import AudioSegment\n", + "from pydub.playback import play\n", + "import numpy as np\n", + "\n", + "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + "\n", + "tts = TTS(\"tts_models/multilingual/multi-dataset/xtts_v2\").to(device)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Start talking...\")\n", + "\n", + "wav = tts.tts(text=\"Bonjour à tous, je m'appelle Bilel et j'aime manger.\", speaker_wav=\"/Users/bilel/Desktop/test.m4a\", language=\"en\")\n", + "\n", + "# Text to speech to a file\n", + "#tts.tts_to_file(text=\"Test\", speaker_wav=\"/Users/bilel/Desktop/ali poireau.m4a\", language=\"fr\", file_path=\"output.wav\")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "correspondance = {\n", + " \"English\": \"en\",\n", + " \"French\": \"fr\",\n", + " \"Spanish\": \"es\",\n", + " \"German\": \"de\",\n", + " \"Italian\": \"it\",\n", + " \"Portuguese\": \"pt\",\n", + " \"Polish\": \"pl\",\n", + " \"Turkish\": \"tr\",\n", + " \"Russian\": \"ru\",\n", + " \"Dutch\": \"nl\",\n", + " \"Czech\": \"cz\",\n", + " \"Arabic\": \"ar\",\n", + " \"Chinese\": \"zh-cn\",\n", + " \"Japanese\": \"ja\",\n", + " \"Hugarian\": \"hu\",\n", + " \"Korean\": \"ko\"\n", + "}.keys()" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "dict_keys(['English', 'French', 'Spanish', 'German', 'Italian', 'Portuguese', 'Polish', 'Turkish', 'Russian', 'Dutch', 'Czech', 'Arabic', 'Chinese', 'Japanese', 'Hugarian', 'Korean'])" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "correspondance" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Start talking...\n", + "End talking\n" + ] + } + ], + "source": [ + "def text_to_speech(text:str, target_language:str):\n", + " audio_bytes = b''.join([np.array(w).astype('float32').tobytes() for w in wav])\n", + "\n", + " audio = AudioSegment(\n", + " audio_bytes,\n", + " frame_rate=22050,\n", + " sample_width=4,\n", + " channels=1 \n", + " )\n", + "\n", + " print(\"Start talking...\")\n", + " play(audio)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..caef37a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +openai-whisper +sounddevice +SpeechRecognition + +# Text to Speech +TTS +pydub \ No newline at end of file diff --git a/talking.py b/talking.py new file mode 100644 index 0000000..22e40b0 --- /dev/null +++ b/talking.py @@ -0,0 +1,44 @@ +import torch +from TTS.api import TTS +from utils import * +from pydub import AudioSegment +from pydub.playback import play +import numpy as np + +device = "cuda" if torch.cuda.is_available() else "cpu" + +tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) + +correspondance = { + "English": "en", + "French": "fr", + "Spanish": "es", + "German": "de", + "Italian": "it", + "Portuguese": "pt", + "Polish": "pl", + "Turkish": "tr", + "Russian": "ru", + "Dutch": "nl", + "Czech": "cz", + "Arabic": "ar", + "Chinese": "zh-cn", + "Japanese": "ja", + "Hugarian": "hu", + "Korean": "ko" +} + +def talking(text:str, target_language:str) -> None: + wav = tts.tts(text=text, speaker_wav="/Users/bilel/Desktop/test.m4a", language=correspondance[target_language]) + + audio_bytes = b''.join([np.array(w).astype('float32').tobytes() for w in wav]) + + audio = AudioSegment( + audio_bytes, + frame_rate=22050, + sample_width=4, + channels=1 + ) + + print("Start talking...") + play(audio) \ No newline at end of file diff --git a/transcript.py b/transcript.py new file mode 100644 index 0000000..9f4e9c2 --- /dev/null +++ b/transcript.py @@ -0,0 +1,161 @@ +from utils import * +from talking import * +import speech_recognition as sr +import whisper +import torch +from datetime import datetime, timedelta +import numpy as np +import argparse +from queue import Queue +import os +from time import sleep +from sys import platform + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--mode", default="translation", + help="Mode considered by the assistant : \"translation\", \"chat\"", type=str) + parser.add_argument("--model", default="base", help="Model to use", + choices=["tiny", "base", "small", "medium", "large"]) + parser.add_argument("--energy_threshold", default=1000, + help="Energy level for mic to detect.", type=int) + parser.add_argument("--record_timeout", default=2, + help="How real time the recording is in seconds.", type=float) + parser.add_argument("--phrase_timeout", default=3, + help="How much empty space between recordings before we " + "consider it a new line in the transcription.", type=float) + + if 'linux' in platform: + parser.add_argument("--default_microphone", default='pulse', + help="Default microphone name for SpeechRecognition." + "Run this with 'list' to view available Microphones.", type=str) + + args = parser.parse_args() + + mode = args.mode + + if mode not in ["translation", "chat"]: + raise ValueError("Mode argument does not comply (it must be equal to either 'translation' or 'chat').") + + if mode == "translation": + source_language, target_language = choose_language("source"), choose_language("target") + if source_language == "English": + english = True + else: + english = False + + else: + english = speak_english() + + phrase_time = None + + data_queue = Queue() + + recorder = sr.Recognizer() + recorder.energy_threshold = args.energy_threshold + + recorder.dynamic_energy_threshold = False + + if 'linux' in platform: + mic_name = args.default_microphone + if not mic_name or mic_name == 'list': + print("Available microphone devices are: ") + for index, name in enumerate(sr.Microphone.list_microphone_names()): + print(f"Microphone with name \"{name}\" found") + return + else: + for index, name in enumerate(sr.Microphone.list_microphone_names()): + if mic_name in name: + source = sr.Microphone(sample_rate=16000, device_index=index) + break + else: + source = sr.Microphone(sample_rate=16000) + + model = args.model + if args.model != "large" and english: + model = model + ".en" + audio_model = whisper.load_model(model) + + record_timeout = args.record_timeout + phrase_timeout = args.phrase_timeout + + transcription = [''] + answers = [''] + + with source: + recorder.adjust_for_ambient_noise(source) + + def record_callback(_, audio:sr.AudioData) -> None: + """ + Threaded callback function to receive audio data when recordings finish. + audio: An AudioData containing the recorded bytes. + """ + data = audio.get_raw_data() + data_queue.put(data) + + recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout) + + print("Model loaded.\n") + print("Start listening...") + + while True: + try: + now = datetime.utcnow() + + if not data_queue.empty(): + phrase_complete = False + if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout): + phrase_complete = True + + phrase_time = now + + audio_data = b''.join(data_queue.queue) + data_queue.queue.clear() + + audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0 + + result = audio_model.transcribe(audio_np, fp16=torch.cuda.is_available()) + text = result['text'].strip() + + if phrase_complete: + transcription.append(text) + else: + transcription[-1] = text + + os.system('cls' if os.name=='nt' else 'clear') + for line in transcription: + print(line) + + if mode == "translation": + answer = translation(text, source_language, target_language) + + elif mode == "chat": + answer = generation(text) + + print(answer) + talking(answer, target_language) + answers.append(answer) + + print('', end='', flush=True) + + sleep(0.5) + except KeyboardInterrupt: + break + + if mode == "translation": + print("\nTranscription :") + for line in transcription: + print(line) + + print("\nTranslation :") + for line in answers: + print(line) + + else: + print("\nTranscription :") + for i in range(len(transcription)): + print(transcription[i]) + print(answers[i]) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..cde7b4f --- /dev/null +++ b/utils.py @@ -0,0 +1,63 @@ +from openai import OpenAI +from dotenv import load_dotenv +import os + +load_dotenv() + +client = OpenAI(api_key=os.getenv("OPENAI_KEY")) + +def translation(text:str, source_language:str="French", target_language:str="Spanish") -> str: + try: + reponse = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": f"You are translation model from {source_language} to {target_language}."}, + {"role": "user", "content": text} + ] + ) + texte_traduit = reponse.choices[0].message.content + return texte_traduit + except Exception as e: + print(f"An error occurred while translating : {e}") + return "" + + +def generation(text:str) -> str: + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "user", "content": text} + ] + ) + response = response.choices[0].message.content + return response + except Exception as e: + print(f"An error occurred while generating a response : {e}") + return "" + + +def choose_language(language_type:str="target"): + languages = ['English', 'French', 'Spanish', 'German', 'Italian', 'Portuguese', 'Arabic'] + print(f"Choose a {language_type} language :") + for index, language in enumerate(languages, start=1): + print(f"{index}. {language}") + + while True: + choice = input("Enter the number corresponding to your choice: ") + if choice.isdigit() and 1 <= int(choice) <= len(languages): + chosen_language = languages[int(choice) - 1] + return chosen_language + else: + print("Invalid input. Please enter a valid number.") + + +def speak_english(): + while True: + english = input("Are you going to talk in English? (Y/n): ").strip().lower() + if english == '' or english == 'y': + return True + elif english == 'n': + return False + else: + print("Invalid input. Please enter 'Y' or 'n'.") \ No newline at end of file