Skip to content

Commit

Permalink
Update screenpipe SDK!
Browse files Browse the repository at this point in the history
Updated based off server.rs from 12/31/24
  • Loading branch information
TanGentleman committed Jan 1, 2025
1 parent b68fbb0 commit a70af17
Showing 1 changed file with 120 additions and 59 deletions.
179 changes: 120 additions & 59 deletions src/core/screenpipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List, Literal, Optional
from httpx import Client, AsyncClient, HTTPError

DEFAULT_SEARCH_LIMIT = 20 # Updated to match server default

class ScreenpipeClient:
"""Client for interacting with the ScreenPipe API."""
Expand Down Expand Up @@ -40,6 +41,8 @@ def _configure_valid_types(self) -> None:
"""Configure valid content and tag types."""
self._valid_content_types = frozenset({"ocr", "audio", "all"})
self._valid_tag_types = frozenset({"audio", "vision"})
# Add new valid types for content addition
self._valid_add_types = frozenset({"frames", "transcription"})

@property
def sync_session(self) -> Client:
Expand Down Expand Up @@ -159,8 +162,8 @@ def health_check(self) -> Optional[Dict]:

def search(
self,
limit: int = 20,
query: str = "",
limit: int = DEFAULT_SEARCH_LIMIT,
query: Optional[str] = None,
content_type: Optional[Literal["ocr", "audio", "all"]] = "all",
offset: Optional[int] = None,
start_time: Optional[str] = None,
Expand All @@ -169,7 +172,8 @@ def search(
window_name: Optional[str] = None,
include_frames: bool = False,
min_length: Optional[int] = None,
max_length: Optional[int] = None
max_length: Optional[int] = None,
speaker_ids: Optional[List[int]] = None
) -> Optional[Dict]:
"""Search captured data in ScreenPipe's database.
Expand All @@ -194,7 +198,8 @@ def search(
"window_name": window_name,
"include_frames": "true" if include_frames else None,
"min_length": min_length,
"max_length": max_length
"max_length": max_length,
"speaker_ids": ",".join(map(str, speaker_ids)) if speaker_ids else None
}
# Remove None values
params = {k: v for k, v in params.items() if v is not None}
Expand Down Expand Up @@ -288,11 +293,12 @@ def download_pipe(self, url: str) -> Optional[Dict]:
Returns:
Optional[Dict]: Success message and pipe ID
"""
PIPE_DOWNLOAD_TIMEOUT = 30
return self._make_request(
"post",
"pipes/download",
json={"url": url},
timeout=self._pipe_download_timeout
timeout=PIPE_DOWNLOAD_TIMEOUT
)

def run_pipe(self, pipe_id: str) -> Optional[Dict]:
Expand Down Expand Up @@ -334,44 +340,57 @@ def update_pipe_configuration(
json={"pipe_id": pipe_id, "config": config}
)

def delete_pipe(self, pipe_id: str) -> Optional[Dict]:
"""Delete a pipe.
Returns:
Optional[Dict]: Success message
"""
return self._make_request(
"post",
"pipes/delete",
json={"pipe_id": pipe_id}
)

def add_content(
self,
device_name: str,
content_type: str,
frames: Optional[List[Dict]] = None,
audio: Optional[List[Dict]] = None) -> Optional[Dict]:
"""Add content (frames or audio) to the database.
transcription: Optional[Dict] = None) -> Optional[Dict]:
"""Add content (frames or transcription) to the database.
Args:
device_name: Name of the device
content_type: Type of content ("frames" or "audio")
content_type: Type of content ("frames" or "transcription")
frames: List of frame data including:
- file_path: Path to frame image
- timestamp: Frame timestamp
- app_name: Application name
- window_name: Window name
- ocr_results: List of OCR results with text and metadata
- tags: List of tags
audio: List of audio data including:
- device_name: Audio device name
- is_input: Whether device is input
- transcription: Audio transcription
- audio_file_path: Path to audio file
- duration_secs: Duration in seconds
- start_offset: Start time offset
transcription: Audio transcription data including:
- transcription: Transcription text
- transcription_engine: Engine used for transcription
Returns:
Optional[Dict]: Success message
"""
if content_type not in self._valid_add_types:
raise ValueError(f"Invalid content_type. Must be one of: {self._valid_add_types}")

content = {
"content_type": content_type,
"data": {}
}

if frames:
content["data"]["frames"] = frames
if audio:
content["data"]["audio"] = audio
elif transcription:
content["data"]["transcription"] = transcription
else:
raise ValueError("Either frames or transcription must be provided")

return self._make_request(
"post",
Expand All @@ -382,69 +401,111 @@ def add_content(
}
)

def stream_frames(
self,
start_time: Optional[str] = None,
end_time: Optional[str] = None) -> Optional[Dict]:
"""Stream frames between start and end time.
def execute_raw_sql(self, query: str) -> Optional[List[Dict]]:
"""Execute raw SQL query against the database.
Args:
start_time: Start timestamp for stream range (ISO format)
end_time: End timestamp for stream range (ISO format)
query: SQL query string
Returns:
Optional[Dict]: Server-sent events stream containing:
- timestamp: Frame timestamp
- devices: List of device data including:
- device_id: Device identifier
- frame: Base64 encoded frame image
- metadata: Frame metadata (path, app, window, OCR)
- audio: Associated audio data
Optional[List[Dict]]: List of query results as dictionaries
"""
params = {
"start_time": start_time,
"end_time": end_time
}
params = {k: v for k, v in params.items() if v is not None}

return self._make_request("get", "stream/frames", params=params)
return self._make_request(
"post",
"raw_sql",
json={"query": query}
)

def execute_input_control(self, action_type: str, action_data: Dict) -> Optional[Dict]:
"""Execute input control action (experimental).
def merge_frames(self, video_paths: List[str]) -> Optional[Dict]:
"""Merge multiple video frames into a single video.
Args:
action_type: Type of input action (e.g. "KeyPress", "MouseMove")
action_data: Action-specific data:
- For KeyPress: key name (e.g. "enter")
- For MouseMove: x,y coordinates
video_paths: List of paths to video files to merge
Returns:
Optional[Dict]: Success message
Optional[Dict]: Path to merged video file
"""
return self._make_request(
"post",
"experimental/input_control",
json={
"action": {
"type": action_type,
"data": action_data
}
}
"experimental/frames/merge",
json={"video_paths": video_paths}
)

def execute_raw_sql(self, query: str) -> Optional[List[Dict]]:
"""Execute raw SQL query against the database.
def validate_media(self, file_path: str) -> Optional[Dict]:
"""Validate a media file.
Args:
query: SQL query string
file_path: Path to media file to validate
Returns:
Optional[List[Dict]]: List of query results as dictionaries
Optional[Dict]: Validation status
"""
return self._make_request(
"post",
"raw_sql",
json={"query": query}
"get",
"experimental/validate/media",
params={"file_path": file_path}
)

# Add new speaker-related methods
def get_unnamed_speakers(
self,
limit: int = DEFAULT_SEARCH_LIMIT,
offset: int = 0,
speaker_ids: Optional[List[int]] = None
) -> Optional[List[Dict]]:
"""Get unnamed speakers.
Args:
limit: Maximum number of results to return
offset: Pagination offset
speaker_ids: List of speaker IDs to include
Returns:
Optional[List[Dict]]: List of unnamed speakers
"""
params = {
"limit": limit,
"offset": offset,
"speaker_ids": ",".join(map(str, speaker_ids)) if speaker_ids else None
}
params = {k: v for k, v in params.items() if v is not None}
return self._make_request("get", "speakers/unnamed", params=params)

def update_speaker(
self,
speaker_id: int,
name: Optional[str] = None,
metadata: Optional[Dict] = None
) -> Optional[Dict]:
"""Update speaker information."""
payload = {"id": speaker_id}
if name:
payload["name"] = name
if metadata:
payload["metadata"] = metadata
return self._make_request("post", "speakers/update", json=payload)

def stream_frames(
self,
start_time: str,
end_time: str
) -> Optional[Dict]:
"""Stream frames between specified timestamps.
Args:
start_time: Start timestamp in ISO format
end_time: End timestamp in ISO format
Returns:
Optional[Dict]: Streaming response (SSE)
"""
return self._make_request(
"get",
"stream/frames",
params={
"start_time": start_time,
"end_time": end_time
}
)


Expand Down

0 comments on commit a70af17

Please sign in to comment.