Skip to content
This repository was archived by the owner on Jul 16, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/stream-tools-gpt-openai.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

use PhpLlm\LlmChain\Bridge\OpenAI\GPT;
use PhpLlm\LlmChain\Bridge\OpenAI\PlatformFactory;
use PhpLlm\LlmChain\Chain;
use PhpLlm\LlmChain\Chain\ToolBox\ChainProcessor;
use PhpLlm\LlmChain\Chain\ToolBox\Tool\Wikipedia;
use PhpLlm\LlmChain\Chain\ToolBox\ToolAnalyzer;
use PhpLlm\LlmChain\Chain\ToolBox\ToolBox;
use PhpLlm\LlmChain\Model\Message\Message;
use PhpLlm\LlmChain\Model\Message\MessageBag;
use Symfony\Component\Dotenv\Dotenv;
use Symfony\Component\HttpClient\HttpClient;

require_once dirname(__DIR__).'/vendor/autoload.php';
(new Dotenv())->loadEnv(dirname(__DIR__).'/.env');

if (empty($_ENV['OPENAI_API_KEY'])) {
echo 'Please set the OPENAI_API_KEY environment variable.'.PHP_EOL;
exit(1);
}

$platform = PlatformFactory::create($_ENV['OPENAI_API_KEY']);
$llm = new GPT(GPT::GPT_4O_MINI);

$wikipedia = new Wikipedia(HttpClient::create());
$toolBox = new ToolBox(new ToolAnalyzer(), [$wikipedia]);
$processor = new ChainProcessor($toolBox);
$chain = new Chain($platform, $llm, [$processor], [$processor]);
$messages = new MessageBag(Message::ofUser(<<<TXT
First, define unicorn in 30 words.
Then lookup at Wikipedia what the irish history looks like in 2 sentences.
Please tell me before you call tools.
TXT));
$response = $chain->call($messages, [
'stream' => true, // enable streaming of response text
]);

foreach ($response->getContent() as $word) {
echo $word;
}

echo PHP_EOL;
69 changes: 42 additions & 27 deletions src/Bridge/OpenAI/GPT/ResponseConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public function convert(HttpResponse $response, array $options = []): LlmRespons
}

/** @var Choice[] $choices */
$choices = array_map([$this, 'convertChoice'], $data['choices']);
$choices = \array_map([$this, 'convertChoice'], $data['choices']);

if (1 !== count($choices)) {
return new ChoiceResponse(...$choices);
Expand All @@ -65,14 +65,10 @@ public function convert(HttpResponse $response, array $options = []): LlmRespons
return new TextResponse($choices[0]->getContent());
}

private function convertStream(HttpResponse $response): ToolCallResponse|StreamResponse
private function convertStream(HttpResponse $response): StreamResponse
{
$stream = $this->streamResponse($response);

if ($this->streamIsToolCall($stream)) {
return new ToolCallResponse(...$this->convertStreamToToolCalls($stream));
}

return new StreamResponse($this->convertStreamContent($stream));
}

Expand All @@ -84,7 +80,9 @@ private function streamResponse(HttpResponse $response): \Generator
}

try {
yield $chunk->getArrayData();
$data = $chunk->getArrayData();

yield $data;
} catch (JsonException) {
// try catch only needed for Symfony 6.4
continue;
Expand All @@ -100,37 +98,46 @@ private function streamIsToolCall(\Generator $response): bool
}

/**
* @return ToolCall[]
* @param array<string, mixed> $toolCalls
* @param array<string, mixed> $data
*
* @return array<string, mixed>
*/
private function convertStreamToToolCalls(\Generator $response): array
private function convertStreamToToolCalls(array $toolCalls, array $data): array
{
$toolCalls = [];
foreach ($response as $data) {
if (!isset($data['choices'][0]['delta']['tool_calls'])) {
if (!isset($data['choices'][0]['delta']['tool_calls'])) {
return $toolCalls;
}

foreach ($data['choices'][0]['delta']['tool_calls'] as $i => $toolCall) {
if (isset($toolCall['id'])) {
// initialize tool call
$toolCalls[$i] = [
'id' => $toolCall['id'],
'function' => $toolCall['function'],
];
continue;
}

foreach ($data['choices'][0]['delta']['tool_calls'] as $i => $toolCall) {
if (isset($toolCall['id'])) {
// initialize tool call
$toolCalls[$i] = [
'id' => $toolCall['id'],
'function' => $toolCall['function'],
];
continue;
}

// add arguments delta to tool call
$toolCalls[$i]['function']['arguments'] .= $toolCall['function']['arguments'];
}
// add arguments delta to tool call
$toolCalls[$i]['function']['arguments'] .= $toolCall['function']['arguments'];
}

return array_map([$this, 'convertToolCall'], $toolCalls);
return $toolCalls;
}

private function convertStreamContent(\Generator $generator): \Generator
{
$toolCalls = [];
foreach ($generator as $data) {
if ($this->streamIsToolCall($generator)) {
$toolCalls = $this->convertStreamToToolCalls($toolCalls, $data);
}

if ([] !== $toolCalls && $this->isToolCallsStreamFinished($data)) {
yield new ToolCallResponse(...\array_map([$this, 'convertToolCall'], $toolCalls));
}

if (!isset($data['choices'][0]['delta']['content'])) {
continue;
}
Expand All @@ -139,6 +146,14 @@ private function convertStreamContent(\Generator $generator): \Generator
}
}

/**
* @param array<string, mixed> $data
*/
private function isToolCallsStreamFinished(array $data): bool
{
return isset($data['choices'][0]['finish_reason']) && 'tool_calls' === $data['choices'][0]['finish_reason'];
}

/**
* @param array{
* index: integer,
Expand All @@ -162,7 +177,7 @@ private function convertStreamContent(\Generator $generator): \Generator
private function convertChoice(array $choice): Choice
{
if ('tool_calls' === $choice['finish_reason']) {
return new Choice(toolCalls: array_map([$this, 'convertToolCall'], $choice['message']['tool_calls']));
return new Choice(toolCalls: \array_map([$this, 'convertToolCall'], $choice['message']['tool_calls']));
}

if ('stop' === $choice['finish_reason']) {
Expand Down
56 changes: 43 additions & 13 deletions src/Chain/ToolBox/ChainProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@
use PhpLlm\LlmChain\Chain\Output;
use PhpLlm\LlmChain\Chain\OutputProcessor;
use PhpLlm\LlmChain\Chain\ToolBox\Event\ToolCallsExecuted;
use PhpLlm\LlmChain\Chain\ToolBox\StreamResponse as ToolboxStreamResponse;
use PhpLlm\LlmChain\Exception\MissingModelSupport;
use PhpLlm\LlmChain\Model\Message\AssistantMessage;
use PhpLlm\LlmChain\Model\Message\Message;
use PhpLlm\LlmChain\Model\Response\ResponseInterface;
use PhpLlm\LlmChain\Model\Response\StreamResponse as GenericStreamResponse;
use PhpLlm\LlmChain\Model\Response\ToolCallResponse;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

Expand Down Expand Up @@ -45,23 +49,49 @@ public function processInput(Input $input): void

public function processOutput(Output $output): void
{
$messages = clone $output->messages;
if ($output->response instanceof GenericStreamResponse) {
$output->response = new ToolboxStreamResponse(
$output->response->getContent(),
$this->handleToolCallsCallback($output),
);

while ($output->response instanceof ToolCallResponse) {
$toolCalls = $output->response->getContent();
$messages->add(Message::ofAssistant(toolCalls: $toolCalls));
return;
}

if (!$output->response instanceof ToolCallResponse) {
return;
}

$output->response = $this->handleToolCallsCallback($output)($output->response);
}

$results = [];
foreach ($toolCalls as $toolCall) {
$result = $this->toolBox->execute($toolCall);
$results[] = new ToolCallResult($toolCall, $result);
$messages->add(Message::ofToolCall($toolCall, $this->resultConverter->convert($result)));
private function handleToolCallsCallback(Output $output): \Closure
{
return function (ToolCallResponse $response, ?AssistantMessage $streamedAssistantResponse = null) use ($output): ResponseInterface {
$messages = clone $output->messages;

if (null !== $streamedAssistantResponse && '' !== $streamedAssistantResponse->content) {
$messages->add($streamedAssistantResponse);
}

$event = new ToolCallsExecuted(...$results);
$this->eventDispatcher?->dispatch($event);
do {
$toolCalls = $response->getContent();
$messages->add(Message::ofAssistant(toolCalls: $toolCalls));

$output->response = $event->hasResponse() ? $event->response : $this->chain->call($messages, $output->options);
}
$results = [];
foreach ($toolCalls as $toolCall) {
$result = $this->toolBox->execute($toolCall);
$results[] = new ToolCallResult($toolCall, $result);
$messages->add(Message::ofToolCall($toolCall, $this->resultConverter->convert($result)));
}

$event = new ToolCallsExecuted(...$results);
$this->eventDispatcher?->dispatch($event);

$response = $event->hasResponse() ? $event->response : $this->chain->call($messages, $output->options);
} while ($response instanceof ToolCallResponse);

return $response;
};
}
}
33 changes: 33 additions & 0 deletions src/Chain/ToolBox/StreamResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);

namespace PhpLlm\LlmChain\Chain\ToolBox;

use PhpLlm\LlmChain\Model\Message\Message;
use PhpLlm\LlmChain\Model\Response\ResponseInterface;
use PhpLlm\LlmChain\Model\Response\ToolCallResponse;

final readonly class StreamResponse implements ResponseInterface
{
public function __construct(
private \Generator $generator,
private \Closure $handleToolCallsCallback,
) {
}

public function getContent(): \Generator
{
$streamedResponse = '';
foreach ($this->generator as $value) {
if ($value instanceof ToolCallResponse) {
yield from ($this->handleToolCallsCallback)($value, Message::ofAssistant($streamedResponse))->getContent();

break;
}

$streamedResponse .= $value;
yield $value;
}
}
}