Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion videodb/__about__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
""" About information for videodb sdk"""


__version__ = "0.2.13"
__version__ = "0.2.14"
__title__ = "videodb"
__author__ = "videodb"
__email__ = "contact@videodb.io"
Expand Down
4 changes: 4 additions & 0 deletions videodb/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class ApiPath:
storage = "storage"
download = "download"
title = "title"
rtstream = "rtstream"
status = "status"
event = "event"
alert = "alert"
generate_url = "generate_url"
generate = "generate"
web = "web"
Expand Down
24 changes: 24 additions & 0 deletions videodb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ def get_invoices(self) -> List[dict]:
"""
return self.get(path=f"{ApiPath.billing}/{ApiPath.invoices}")

def create_event(self, event_prompt: str, label: str):
"""Create an rtstream event.

:param str event_prompt: Prompt for the event
:param str label: Label for the event
:return: Event ID
:rtype: str
"""
event_data = self.post(
f"{ApiPath.rtstream}/{ApiPath.event}",
data={"event_prompt": event_prompt, "label": label},
)

return event_data.get("event_id")

def list_events(self):
"""List all rtstream events.

:return: List of events
:rtype: list[dict]
"""
event_data = self.get(f"{ApiPath.rtstream}/{ApiPath.event}")
return event_data.get("events", [])

def download(self, stream_link: str, name: str) -> dict:
"""Download a file from a stream link.

Expand Down
48 changes: 48 additions & 0 deletions videodb/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from videodb.video import Video
from videodb.audio import Audio
from videodb.image import Image
from videodb.rtstream import RTStream
from videodb.search import SearchFactory, SearchResult

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -164,6 +165,53 @@ def delete_image(self, image_id: str) -> None:
path=f"{ApiPath.image}/{image_id}", params={"collection_id": self.id}
)

def connect_rtstream(
self, url: str, name: str, sample_rate: int = None
) -> RTStream:
"""Connect to an rtstream.

:param str url: URL of the rtstream
:param str name: Name of the rtstream
:param int sample_rate: Sample rate of the rtstream default is 1fps
:return: :class:`RTStream <RTStream>` object
"""
rtstream_data = self._connection.post(
path=f"{ApiPath.rtstream}",
data={
"collection_id": self.id,
"url": url,
"name": name,
"sample_rate": sample_rate,
},
)
return RTStream(self._connection, **rtstream_data)

def get_rtstream(self, id: str) -> RTStream:
"""Get an rtstream by its ID.

:param str id: ID of the rtstream
:return: :class:`RTStream <RTStream>` object
:rtype: :class:`videodb.rtstream.RTStream`
"""
rtstream_data = self._connection.get(
path=f"{ApiPath.rtstream}/{id}",
)
return RTStream(self._connection, **rtstream_data)

def list_rtstreams(self) -> List[RTStream]:
"""List all rtstreams in the collection.

:return: List of :class:`RTStream <RTStream>` objects
:rtype: List[:class:`videodb.rtstream.RTStream`]
"""
rtstreams_data = self._connection.get(
path=f"{ApiPath.rtstream}",
)
return [
RTStream(self._connection, **rtstream)
for rtstream in rtstreams_data.get("results")
]

def generate_image(
self,
prompt: str,
Expand Down
Loading