Skip to content

Commit

Permalink
Updates youtube server example
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahbenizzy committed Sep 15, 2024
1 parent df6b8ef commit 236d76d
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 31 deletions.
2 changes: 1 addition & 1 deletion burr/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ def pydantic(
writes: List[str],
state_input_type: Type["BaseModel"],
state_output_type: Type["BaseModel"],
stream_type: Type[StreamType],
stream_type: Union[Type["BaseModel"], Type[dict]],
) -> Callable:
try:
from burr.integrations.pydantic import pydantic_streaming_action
Expand Down
31 changes: 17 additions & 14 deletions examples/youtube-to-social-media-post/application.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import textwrap
from typing import Any, Generator, Optional, Tuple, Union
from typing import Any, AsyncGenerator, Generator, Optional, Tuple, Union

import instructor
import openai
Expand All @@ -8,13 +8,13 @@
from rich.console import Console
from youtube_transcript_api import YouTubeTranscriptApi

from burr.core import Application, ApplicationBuilder
from burr.core.action import AsyncStreamingResultContainer, StreamingResultContainer
from burr.integrations.pydantic import (
PydanticTypingSystem,
pydantic_action,
pydantic_streaming_action,
from burr.core import Application, ApplicationBuilder, action
from burr.core.action import (
AsyncStreamingResultContainer,
StreamingResultContainer,
streaming_action,
)
from burr.integrations.pydantic import PydanticTypingSystem


class Concept(BaseModel):
Expand Down Expand Up @@ -122,7 +122,7 @@ def __copy__(self, memo: dict[int, Any] | None = None):
# return new_obj


@pydantic_action(reads=[], writes=["transcript"])
@action.pydantic(reads=[], writes=["transcript"])
def get_youtube_transcript(state: ApplicationState, youtube_url: str) -> ApplicationState:
"""Get the official YouTube transcript for a video given it's URL"""
_, _, video_id = youtube_url.partition("?v=")
Expand All @@ -134,7 +134,7 @@ def get_youtube_transcript(state: ApplicationState, youtube_url: str) -> Applica
# store the transcript in state


@pydantic_action(reads=["transcript"], writes=["post"])
@action.pydantic(reads=["transcript"], writes=["post"])
def generate_post(state: ApplicationState, llm_client) -> ApplicationState:
"""Use the Instructor LLM client to generate `SocialMediaPost` from the YouTube transcript."""

Expand All @@ -158,7 +158,7 @@ def generate_post(state: ApplicationState, llm_client) -> ApplicationState:
return state


@pydantic_streaming_action(
@streaming_action.pydantic(
reads=["transcript"],
writes=["post"],
state_input_type=ApplicationState,
Expand Down Expand Up @@ -191,7 +191,7 @@ def generate_post_streaming(
yield final_post, state


@pydantic_streaming_action(
@streaming_action.pydantic(
reads=["transcript"],
writes=["post"],
state_input_type=ApplicationState,
Expand All @@ -200,7 +200,7 @@ def generate_post_streaming(
)
async def generate_post_streaming_async(
state: ApplicationStateStream, llm_client
) -> Generator[Tuple[SocialMediaPost, Optional[ApplicationState]], None, None]:
) -> AsyncGenerator[Tuple[SocialMediaPost, Optional[ApplicationState]], None]:
"""Use the Instructor LLM client to generate `SocialMediaPost` from the YouTube transcript."""

transcript = state.transcript
Expand Down Expand Up @@ -234,6 +234,7 @@ def build_application() -> Application[ApplicationState]:
)
.with_transitions(
("get_youtube_transcript", "generate_post"),
("generate_post", "get_youtube_transcript"),
)
# .with_state_persister(SQLLitePersister(db_path=".burr.db", table_name="state"))
.with_entrypoint("get_youtube_transcript")
Expand All @@ -255,6 +256,7 @@ def build_streaming_application() -> Application[ApplicationState]:
)
.with_transitions(
("get_youtube_transcript", "generate_post"),
("generate_post", "get_youtube_transcript"),
)
# .with_state_persister(SQLLitePersister(db_path=".burr.db", table_name="state"))
.with_entrypoint("get_youtube_transcript")
Expand All @@ -276,6 +278,7 @@ def build_streaming_application_async() -> Application[ApplicationState]:
)
.with_transitions(
("get_youtube_transcript", "generate_post"),
("generate_post", "get_youtube_transcript"),
)
# .with_state_persister(SQLLitePersister(db_path=".burr.db", table_name="state"))
.with_entrypoint("get_youtube_transcript")
Expand All @@ -291,7 +294,7 @@ async def run_async():
console = Console()
app = build_streaming_application_async()

action, streaming_container = await app.astream_result(
a, streaming_container = await app.astream_result(
halt_after=["generate_post"],
inputs={"youtube_url": "https://www.youtube.com/watch?v=hqutVJyd3TI"},
) # type: ignore
Expand All @@ -306,7 +309,7 @@ async def run_async():
if __name__ == "__main__":
console = Console()
app = build_streaming_application()
action, streaming_container = app.stream_result(
a, streaming_container = app.stream_result(
halt_after=["generate_post"],
inputs={"youtube_url": "https://www.youtube.com/watch?v=hqutVJyd3TI"},
) # type: ignore
Expand Down
20 changes: 20 additions & 0 deletions examples/youtube-to-social-media-post/curls.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

# Default to the 'social_media_post' endpoint if no argument is passed
ENDPOINT="social_media_post"
if [[ "$1" == "streaming_async" ]]; then
ENDPOINT="social_media_post_streaming_async"
elif [[ "$1" == "streaming" ]]; then
ENDPOINT="social_media_post_streaming"
fi

# Perform the curl request to the chosen endpoint
curl -X 'GET' "http://localhost:7443/$ENDPOINT" \
-s -H 'Accept: application/json' \
--no-buffer | jq --unbuffered -c '.' | while IFS= read -r line; do
if [[ "$line" != "" ]]; then # Check for non-empty lines
clear
echo "$line" | jq --color-output .
sleep .01 # Add a small delay for visual clarity
fi
done
63 changes: 47 additions & 16 deletions examples/youtube-to-social-media-post/server.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,88 @@
import contextlib
import json
import logging

import fastapi
import uvicorn
from application import (
ApplicationState,
ApplicationStateStream,
SocialMediaPost,
build_application,
build_application_iterator_streaming,
build_streaming_application,
build_streaming_application_async,
)
from fastapi.responses import StreamingResponse

from burr.core import Application
from burr.core.action import AsyncStreamingResultContainer, StreamingResultContainer

logger = logging.getLogger(__name__)

# define a global `burr_app` variable
burr_app: Application[ApplicationState] = None
# Second variant -- this uses a stream + a self-loop
# Note this will save a *lot* to the tracker, each stream!
burr_app_streaming_iterator: Application[ApplicationStateStream] = None
# This does streaming, in sync mode
burr_app_streaming: Application[ApplicationState] = None

# And this does streaming, in async mode
burr_app_streaming_async: Application[ApplicationState] = None

DEFAULT_YOUTUBE_URL = "https://www.youtube.com/watch?v=hqutVJyd3TI"


@contextlib.asynccontextmanager
async def lifespan(app: fastapi.FastAPI):
"""Instantiate the Burr application on FastAPI startup."""
# set value for the global `burr_app` variable
global burr_app, burr_app_streaming_iterator
global burr_app, burr_app_streaming, burr_app_streaming_async
burr_app = build_application()
burr_app_streaming_iterator = build_application_iterator_streaming()
burr_app_streaming = build_streaming_application()
burr_app_streaming_async = build_streaming_application_async()
yield


app = fastapi.FastAPI(lifespan=lifespan)


@app.get("/social_media_post", response_model=ApplicationState)
def social_media_post(youtube_url: str) -> ApplicationState:
@app.get("/social_media_post", response_model=SocialMediaPost)
def social_media_post(youtube_url: str = DEFAULT_YOUTUBE_URL) -> SocialMediaPost:
"""Creates a completion for the chat message"""
_, _, state = burr_app.run(halt_after=["generate_post"], inputs={"youtube_url": youtube_url})

return state.data
return state.data.post


@app.get("/social_media_post_streaming_async", response_class=StreamingResponse)
async def social_media_post_streaming_async(
youtube_url: str = DEFAULT_YOUTUBE_URL,
) -> StreamingResponse:
"""Creates a completion for the chat message"""

async def gen():
_, streaming_container = await burr_app_streaming_async.astream_result(
halt_after=["generate_post"],
inputs={"youtube_url": youtube_url},
) # type: ignore
streaming_container: AsyncStreamingResultContainer[ApplicationState, SocialMediaPost]
async for post in streaming_container:
obj = post.model_dump()
yield json.dumps(obj)

return StreamingResponse(gen())


@app.get("/social_media_post_streaming_1", response_model=StreamingResponse)
def social_media_post_streaming(youtube_url: str) -> StreamingResponse:
@app.get("/social_media_post_streaming", response_class=StreamingResponse)
def social_media_post_streaming(youtube_url: str = DEFAULT_YOUTUBE_URL) -> StreamingResponse:
"""Creates a completion for the chat message"""

def gen():
for action, _, state in burr_app_streaming_iterator.iterate(
halt_after=["final"], inputs={"youtube_url": youtube_url}
):
yield state.data.model_dump_json()
_, streaming_container = burr_app_streaming.stream_result(
halt_after=["generate_post"],
inputs={"youtube_url": youtube_url},
) # type: ignore
streaming_container: StreamingResultContainer[ApplicationState, SocialMediaPost]
for post in streaming_container:
obj = post.model_dump()
yield json.dumps(obj)

return StreamingResponse(gen())

Expand Down

0 comments on commit 236d76d

Please sign in to comment.