Skip to content

end points made with tests for audio #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 67 additions & 23 deletions jigsawstack/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ class TextToSpeechParams(TypedDict):
speaker_clone_file_store_key: NotRequired[str]


class TTSCloneParams(TypedDict):
url: NotRequired[str]
file_store_key: NotRequired[str]
name: str


class GetTTSVoiceClonesParams(TypedDict):
limit: NotRequired[int]
page: NotRequired[int]


class TextToSpeechResponse(TypedDict):
success: bool
text: str
Expand Down Expand Up @@ -70,12 +81,10 @@ def speech_to_text(self, params: SpeechToTextParams) -> SpeechToTextResponse: ..
@overload
def speech_to_text(self, file: bytes, options: Optional[SpeechToTextParams] = None) -> SpeechToTextResponse: ...

def speech_to_text(
self,
blob: Union[SpeechToTextParams, bytes],
options: Optional[SpeechToTextParams] = None,
) -> SpeechToTextResponse:
if isinstance(blob, dict): # If params is provided as a dict, we assume it's the first argument
def speech_to_text(self, blob: Union[SpeechToTextParams, bytes], options: Optional[SpeechToTextParams] = None) -> SpeechToTextResponse:
if isinstance(
blob, dict
): # If params is provided as a dict, we assume it's the first argument
resp = Request(
config=self.config,
path="/ai/transcribe",
Expand All @@ -89,17 +98,9 @@ def speech_to_text(
content_type = options.get("content_type", "application/octet-stream")
headers = {"Content-Type": content_type}

resp = Request(
config=self.config,
path=path,
params=options,
data=blob,
headers=headers,
verb="post",
).perform_with_content()
resp = Request(config=self.config, path=path, params=options, data=blob, headers=headers, verb="post").perform_with_content()
return resp


def text_to_speech(self, params: TextToSpeechParams) -> TextToSpeechResponse:
path = "/ai/tts"
resp = Request(
Expand All @@ -112,12 +113,23 @@ def text_to_speech(self, params: TextToSpeechParams) -> TextToSpeechResponse:

def speaker_voice_accents(self) -> TextToSpeechResponse:
path = "/ai/tts"
resp = Request(
config=self.config,
path=path,
params={},
verb="get",
).perform_with_content()
resp = Request(config=self.config, path=path, params={}, verb="get").perform_with_content()
return resp

def create_clone(self, params: TTSCloneParams) -> TextToSpeechResponse:
path = "/ai/tts/clone"
resp = Request(config=self.config, path=path, params=cast(Dict[Any, Any], params), verb="post").perform_with_content()

return resp

def get_clones(self, params: GetTTSVoiceClonesParams) -> TextToSpeechResponse:
path = "/ai/tts/clone"
resp = Request(config=self.config, path=path, params=cast(Dict[Any, Any], params), verb="get").perform_with_content()
return resp

def delete_clone(self, voice_id: str) -> TextToSpeechResponse:
path = f"/ai/tts/clone/{voice_id}"
resp = Request(config=self.config, path=path, params={}, verb="delete").perform_with_content()
return resp


Expand All @@ -140,7 +152,9 @@ def __init__(
@overload
async def speech_to_text(self, params: SpeechToTextParams) -> SpeechToTextResponse: ...
@overload
async def speech_to_text(self, file: bytes, options: Optional[SpeechToTextParams] = None) -> SpeechToTextResponse: ...
async def speech_to_text(
self, file: bytes, options: Optional[SpeechToTextParams] = None
) -> SpeechToTextResponse: ...

async def speech_to_text(
self,
Expand All @@ -155,7 +169,7 @@ async def speech_to_text(
verb="post",
).perform_with_content()
return resp

options = options or {}
path = build_path(base_path="/ai/transcribe", params=options)
content_type = options.get("content_type", "application/octet-stream")
Expand Down Expand Up @@ -190,3 +204,33 @@ async def speaker_voice_accents(self) -> TextToSpeechResponse:
verb="get",
).perform_with_content()
return resp

async def create_clone(self, params: TTSCloneParams) -> TextToSpeechResponse:
path = "/ai/tts/clone"
resp = await AsyncRequest(
config=self.config,
path=path,
params=cast(Dict[Any, Any], params),
verb="post"
).perform_with_content()
return resp

async def get_clones(self, params: GetTTSVoiceClonesParams) -> TextToSpeechResponse:
path = "/ai/tts/clone"
resp = await AsyncRequest(
config=self.config,
path=path,
params=cast(Dict[Any, Any], params),
verb="get"
).perform_with_content()
return resp

async def delete_clone(self, voice_id: str) -> TextToSpeechResponse:
path = f"/ai/tts/clone/{voice_id}"
resp = await AsyncRequest(
config=self.config,
path=path,
params={},
verb="delete"
).perform_with_content()
return resp
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="jigsawstack",
version="0.2.2",
version="0.2.3",
description="JigsawStack - The AI SDK for Python",
long_description=open("README.md", encoding="utf8").read(),
long_description_content_type="text/markdown",
Expand Down
100 changes: 95 additions & 5 deletions tests/test_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,108 @@
import pytest
import asyncio
import logging
from jigsawstack import AsyncJigsawStack

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def test_async_speaker_voice_accents_response():
def test_text_to_speech():
async def _test():
client = AsyncJigsawStack()

"""Test converting text to speech"""
try:
response = await client.audio.text_to_speech(
{
"text": "Hello world, this is a test of the JigsawStack text to speech API."
}
)
print("Text to speech response:", response)
assert response["success"] == True

except Exception as e:
print(f"Error in text_to_speech test: {e}")

asyncio.run(_test())


def test_speaker_voice_accents():
async def _test():
client = AsyncJigsawStack()

"""Test getting available voice accents"""
try:
response = await client.audio.speaker_voice_accents()
print("Speaker voice accents response:", response)
assert response["success"] == True

except Exception as e:
print(f"Error in speaker voice accents test: {e}")


def test_create_clone():
async def _test():
client = AsyncJigsawStack()

"""Test creating a voice clone with URL"""
try:
result = await client.audio.speaker_voice_accents()
assert result["success"] == True
except JigsawStackError as e:
pytest.fail(f"Unexpected JigsawStackError: {e}")
audio_url = (
"https://jigsawstack.com/audio/test.mp3" # Replace with an actual URL
)
clone_response_url = await client.audio.create_clone(
{"url": audio_url, "name": "Test Voice Clone URL"}
)

assert clone_response_url["success"] == True

clone_response_file_store_key = client.audio.create_clone(
{
"file_store_key": "hello_audio",
"name": "Test Voice Clone File Store Key",
}
)

assert clone_response_file_store_key["success"] == True

except Exception as e:
print(f"Error in voice_cloning test: {e}")

asyncio.run(_test())


def test_get_clones():
async def _test():
client = AsyncJigsawStack()
"""Test getting voice clones"""
try:
# List available voice clones
clones_response = await client.audio.get_clones({"limit": 10, "page": 1})

assert clones_response["success"] == True

except Exception as e:
print(f"Error in voice_cloning test: {e}")

asyncio.run(_test())


def test_delete_clone():
async def _test():
client = AsyncJigsawStack()
"""Test getting a voice clone"""
try:
create_clone_response = await client.audio.create_clone(
{"name": "Test Voice Clone URL", "file_store_key": "hello_audio"}
)
clones = await client.audio.get_clones({"limit": 10, "page": 1})
print("Clones:", clones)
clone_id = clones["data"][0]["id"]
delete_clone_response = await client.audio.delete_clone(clone_id)
print("Delete clone response:", delete_clone_response)
assert delete_clone_response["success"] == True

except Exception as e:
print(f"Error in get_clone test: {e}")

asyncio.run(_test())