From 236d76dbe31fe89f7655f8343cadc033bf485e26 Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Wed, 11 Sep 2024 13:34:14 -0700 Subject: [PATCH] Updates youtube server example --- burr/core/action.py | 2 +- .../application.py | 31 ++++----- .../youtube-to-social-media-post/curls.sh | 20 ++++++ .../youtube-to-social-media-post/server.py | 63 ++++++++++++++----- 4 files changed, 85 insertions(+), 31 deletions(-) create mode 100755 examples/youtube-to-social-media-post/curls.sh diff --git a/burr/core/action.py b/burr/core/action.py index a08c27e5..ab80b2b0 100644 --- a/burr/core/action.py +++ b/burr/core/action.py @@ -1229,7 +1229,7 @@ def pydantic( writes: List[str], state_input_type: Type["BaseModel"], state_output_type: Type["BaseModel"], - stream_type: Type[StreamType], + stream_type: Union[Type["BaseModel"], Type[dict]], ) -> Callable: try: from burr.integrations.pydantic import pydantic_streaming_action diff --git a/examples/youtube-to-social-media-post/application.py b/examples/youtube-to-social-media-post/application.py index a3a0a8e9..e0b6983e 100644 --- a/examples/youtube-to-social-media-post/application.py +++ b/examples/youtube-to-social-media-post/application.py @@ -1,5 +1,5 @@ import textwrap -from typing import Any, Generator, Optional, Tuple, Union +from typing import Any, AsyncGenerator, Generator, Optional, Tuple, Union import instructor import openai @@ -8,13 +8,13 @@ from rich.console import Console from youtube_transcript_api import YouTubeTranscriptApi -from burr.core import Application, ApplicationBuilder -from burr.core.action import AsyncStreamingResultContainer, StreamingResultContainer -from burr.integrations.pydantic import ( - PydanticTypingSystem, - pydantic_action, - pydantic_streaming_action, +from burr.core import Application, ApplicationBuilder, action +from burr.core.action import ( + AsyncStreamingResultContainer, + StreamingResultContainer, + streaming_action, ) +from burr.integrations.pydantic import PydanticTypingSystem class Concept(BaseModel): @@ -122,7 +122,7 @@ def __copy__(self, memo: dict[int, Any] | None = None): # return new_obj -@pydantic_action(reads=[], writes=["transcript"]) +@action.pydantic(reads=[], writes=["transcript"]) def get_youtube_transcript(state: ApplicationState, youtube_url: str) -> ApplicationState: """Get the official YouTube transcript for a video given it's URL""" _, _, video_id = youtube_url.partition("?v=") @@ -134,7 +134,7 @@ def get_youtube_transcript(state: ApplicationState, youtube_url: str) -> Applica # store the transcript in state -@pydantic_action(reads=["transcript"], writes=["post"]) +@action.pydantic(reads=["transcript"], writes=["post"]) def generate_post(state: ApplicationState, llm_client) -> ApplicationState: """Use the Instructor LLM client to generate `SocialMediaPost` from the YouTube transcript.""" @@ -158,7 +158,7 @@ def generate_post(state: ApplicationState, llm_client) -> ApplicationState: return state -@pydantic_streaming_action( +@streaming_action.pydantic( reads=["transcript"], writes=["post"], state_input_type=ApplicationState, @@ -191,7 +191,7 @@ def generate_post_streaming( yield final_post, state -@pydantic_streaming_action( +@streaming_action.pydantic( reads=["transcript"], writes=["post"], state_input_type=ApplicationState, @@ -200,7 +200,7 @@ def generate_post_streaming( ) async def generate_post_streaming_async( state: ApplicationStateStream, llm_client -) -> Generator[Tuple[SocialMediaPost, Optional[ApplicationState]], None, None]: +) -> AsyncGenerator[Tuple[SocialMediaPost, Optional[ApplicationState]], None]: """Use the Instructor LLM client to generate `SocialMediaPost` from the YouTube transcript.""" transcript = state.transcript @@ -234,6 +234,7 @@ def build_application() -> Application[ApplicationState]: ) .with_transitions( ("get_youtube_transcript", "generate_post"), + ("generate_post", "get_youtube_transcript"), ) # .with_state_persister(SQLLitePersister(db_path=".burr.db", table_name="state")) .with_entrypoint("get_youtube_transcript") @@ -255,6 +256,7 @@ def build_streaming_application() -> Application[ApplicationState]: ) .with_transitions( ("get_youtube_transcript", "generate_post"), + ("generate_post", "get_youtube_transcript"), ) # .with_state_persister(SQLLitePersister(db_path=".burr.db", table_name="state")) .with_entrypoint("get_youtube_transcript") @@ -276,6 +278,7 @@ def build_streaming_application_async() -> Application[ApplicationState]: ) .with_transitions( ("get_youtube_transcript", "generate_post"), + ("generate_post", "get_youtube_transcript"), ) # .with_state_persister(SQLLitePersister(db_path=".burr.db", table_name="state")) .with_entrypoint("get_youtube_transcript") @@ -291,7 +294,7 @@ async def run_async(): console = Console() app = build_streaming_application_async() - action, streaming_container = await app.astream_result( + a, streaming_container = await app.astream_result( halt_after=["generate_post"], inputs={"youtube_url": "https://www.youtube.com/watch?v=hqutVJyd3TI"}, ) # type: ignore @@ -306,7 +309,7 @@ async def run_async(): if __name__ == "__main__": console = Console() app = build_streaming_application() - action, streaming_container = app.stream_result( + a, streaming_container = app.stream_result( halt_after=["generate_post"], inputs={"youtube_url": "https://www.youtube.com/watch?v=hqutVJyd3TI"}, ) # type: ignore diff --git a/examples/youtube-to-social-media-post/curls.sh b/examples/youtube-to-social-media-post/curls.sh new file mode 100755 index 00000000..fb479e56 --- /dev/null +++ b/examples/youtube-to-social-media-post/curls.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Default to the 'social_media_post' endpoint if no argument is passed +ENDPOINT="social_media_post" +if [[ "$1" == "streaming_async" ]]; then + ENDPOINT="social_media_post_streaming_async" +elif [[ "$1" == "streaming" ]]; then + ENDPOINT="social_media_post_streaming" +fi + +# Perform the curl request to the chosen endpoint +curl -X 'GET' "http://localhost:7443/$ENDPOINT" \ + -s -H 'Accept: application/json' \ + --no-buffer | jq --unbuffered -c '.' | while IFS= read -r line; do + if [[ "$line" != "" ]]; then # Check for non-empty lines + clear + echo "$line" | jq --color-output . + sleep .01 # Add a small delay for visual clarity + fi +done diff --git a/examples/youtube-to-social-media-post/server.py b/examples/youtube-to-social-media-post/server.py index 4c0fd20f..906bb155 100644 --- a/examples/youtube-to-social-media-post/server.py +++ b/examples/youtube-to-social-media-post/server.py @@ -1,57 +1,88 @@ import contextlib +import json import logging import fastapi import uvicorn from application import ( ApplicationState, - ApplicationStateStream, + SocialMediaPost, build_application, - build_application_iterator_streaming, + build_streaming_application, + build_streaming_application_async, ) from fastapi.responses import StreamingResponse from burr.core import Application +from burr.core.action import AsyncStreamingResultContainer, StreamingResultContainer logger = logging.getLogger(__name__) # define a global `burr_app` variable burr_app: Application[ApplicationState] = None -# Second variant -- this uses a stream + a self-loop -# Note this will save a *lot* to the tracker, each stream! -burr_app_streaming_iterator: Application[ApplicationStateStream] = None +# This does streaming, in sync mode +burr_app_streaming: Application[ApplicationState] = None + +# And this does streaming, in async mode +burr_app_streaming_async: Application[ApplicationState] = None + +DEFAULT_YOUTUBE_URL = "https://www.youtube.com/watch?v=hqutVJyd3TI" @contextlib.asynccontextmanager async def lifespan(app: fastapi.FastAPI): """Instantiate the Burr application on FastAPI startup.""" # set value for the global `burr_app` variable - global burr_app, burr_app_streaming_iterator + global burr_app, burr_app_streaming, burr_app_streaming_async burr_app = build_application() - burr_app_streaming_iterator = build_application_iterator_streaming() + burr_app_streaming = build_streaming_application() + burr_app_streaming_async = build_streaming_application_async() yield app = fastapi.FastAPI(lifespan=lifespan) -@app.get("/social_media_post", response_model=ApplicationState) -def social_media_post(youtube_url: str) -> ApplicationState: +@app.get("/social_media_post", response_model=SocialMediaPost) +def social_media_post(youtube_url: str = DEFAULT_YOUTUBE_URL) -> SocialMediaPost: """Creates a completion for the chat message""" _, _, state = burr_app.run(halt_after=["generate_post"], inputs={"youtube_url": youtube_url}) - return state.data + return state.data.post + + +@app.get("/social_media_post_streaming_async", response_class=StreamingResponse) +async def social_media_post_streaming_async( + youtube_url: str = DEFAULT_YOUTUBE_URL, +) -> StreamingResponse: + """Creates a completion for the chat message""" + + async def gen(): + _, streaming_container = await burr_app_streaming_async.astream_result( + halt_after=["generate_post"], + inputs={"youtube_url": youtube_url}, + ) # type: ignore + streaming_container: AsyncStreamingResultContainer[ApplicationState, SocialMediaPost] + async for post in streaming_container: + obj = post.model_dump() + yield json.dumps(obj) + + return StreamingResponse(gen()) -@app.get("/social_media_post_streaming_1", response_model=StreamingResponse) -def social_media_post_streaming(youtube_url: str) -> StreamingResponse: +@app.get("/social_media_post_streaming", response_class=StreamingResponse) +def social_media_post_streaming(youtube_url: str = DEFAULT_YOUTUBE_URL) -> StreamingResponse: """Creates a completion for the chat message""" def gen(): - for action, _, state in burr_app_streaming_iterator.iterate( - halt_after=["final"], inputs={"youtube_url": youtube_url} - ): - yield state.data.model_dump_json() + _, streaming_container = burr_app_streaming.stream_result( + halt_after=["generate_post"], + inputs={"youtube_url": youtube_url}, + ) # type: ignore + streaming_container: StreamingResultContainer[ApplicationState, SocialMediaPost] + for post in streaming_container: + obj = post.model_dump() + yield json.dumps(obj) return StreamingResponse(gen())