-
Notifications
You must be signed in to change notification settings - Fork 4
Streaming
Stream LLM responses in real-time as they're generated, reducing perceived latency for users.
class StreamingAgent < ApplicationAgent
model "gpt-4o"
streaming true # Enable streaming for this agent
user "{prompt}"
end# config/initializers/ruby_llm_agents.rb
RubyLLM::Agents.configure do |config|
config.default_streaming = true
endProcess chunks as they arrive:
StreamingAgent.call(user: "Write a story") do |chunk|
print chunk.content # chunk is a RubyLLM::Chunk object
endOutput appears progressively:
Once... upon... a... time...
For more explicit streaming, use the .stream() class method which forces streaming regardless of class settings:
result = MyAgent.stream(user: "Write a story") do |chunk|
print chunk.content
end
# Access result metadata after streaming
puts "Tokens: #{result.total_tokens}"
puts "TTFT: #{result.time_to_first_token_ms}ms"This method:
- Forces streaming even if
streaming falseis set at class level - Requires a block (raises
ArgumentErrorif none provided) - Returns a
Resultobject with full metadata
When streaming completes, the returned Result contains streaming-specific metadata:
result = StreamingAgent.call(user: "test") do |chunk|
print chunk.content
end
result.streaming? # => true
result.time_to_first_token_ms # => 245 (ms until first chunk arrived)
result.duration_ms # => 2500 (total execution time)class StreamingController < ApplicationController
include ActionController::Live
def stream_response
response.headers['Content-Type'] = 'text/event-stream'
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering
StreamingAgent.call(user: params[:prompt]) do |chunk|
response.stream.write "data: #{chunk.to_json}\n\n"
end
response.stream.write "data: [DONE]\n\n"
rescue ActionController::Live::ClientDisconnected
# Client disconnected, clean up
ensure
response.stream.close
end
endconst eventSource = new EventSource('/stream?prompt=' + encodeURIComponent(prompt));
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const chunk = JSON.parse(event.data);
document.getElementById('output').textContent += chunk;
};
eventSource.onerror = () => {
eventSource.close();
};class ChatController < ApplicationController
def create
respond_to do |format|
format.turbo_stream do
StreamingAgent.call(user: params[:message]) do |chunk|
Turbo::StreamsChannel.broadcast_append_to(
"chat_#{params[:chat_id]}",
target: "messages",
partial: "messages/chunk",
locals: { content: chunk }
)
end
end
end
end
end<%= turbo_stream_from "chat_#{@chat.id}" %>
<div id="messages"></div>Streaming executions track latency metrics:
# After streaming completes
execution = RubyLLM::Agents::Execution.last
execution.streaming? # => true
execution.time_to_first_token_ms # => 245 (ms until first chunk)
execution.duration_ms # => 2500 (total time)# Average TTFT for streaming agents
RubyLLM::Agents::Execution.today.avg_time_to_first_token
# => 312Note:
time_to_first_token_msis stored in themetadataJSON column, not as a direct SQL column. Use theavg_time_to_first_tokenanalytics method for aggregation, or access it on individual instances viaexecution.time_to_first_token_ms.
When using schemas, the full response is still validated:
class StructuredStreamingAgent < ApplicationAgent
model "gpt-4o"
streaming true
user "Write about {topic}"
def schema
@schema ||= RubyLLM::Schema.create do
string :title
string :content
end
end
end
# Stream the raw text
StructuredStreamingAgent.call(topic: "AI") do |chunk|
print chunk # Raw JSON chunks
end
# Result is parsed and validated at the endImportant: Streaming responses are not cached by design, as caching would defeat the purpose of real-time streaming.
class MyAgent < ApplicationAgent
streaming true
cache_for 1.hour # Cache is ignored when streaming
endIf you need caching with streaming-like UX, consider:
- Cache the full response
- Simulate streaming on the client side
begin
StreamingAgent.call(user: "test") do |chunk|
print chunk
end
rescue Timeout::Error
puts "\n[Stream timed out]"
rescue => e
puts "\n[Stream error: #{e.message}]"
endFor long-running streams, use ActionCable:
class StreamingJob < ApplicationJob
def perform(prompt, channel_id)
StreamingAgent.call(user: prompt) do |chunk|
ActionCable.server.broadcast(
channel_id,
{ type: 'chunk', content: chunk }
)
end
ActionCable.server.broadcast(
channel_id,
{ type: 'complete' }
)
end
endStreaming is most beneficial for:
- Long-form content generation
- Conversational interfaces
- Real-time transcription/translation
def stream_response
StreamingAgent.call(user: params[:prompt]) do |chunk|
break if response.stream.closed?
response.stream.write "data: #{chunk.to_json}\n\n"
end
ensure
response.stream.close
endclass LongFormAgent < ApplicationAgent
streaming true
timeout 180 # 3 minutes for long content
endTrack time-to-first-token to ensure good UX:
# Alert if TTFT is too high
if execution.time_to_first_token_ms > 1000
Rails.logger.warn("High TTFT: #{execution.time_to_first_token_ms}ms")
end- Agent DSL - Configuration options
- Execution Tracking - TTFT analytics
- Dashboard - Monitoring streaming metrics