Skip to content

Commit 9592bdd

Browse files
committed
Merge branch 'stream' into develop
2 parents 21fdd35 + e39a9d8 commit 9592bdd

File tree

12 files changed

+486
-115
lines changed

12 files changed

+486
-115
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"require": {
1313
"php": "^8.0",
1414
"guzzlehttp/guzzle": "^7.0",
15-
"psr/log": "^1.0|^2.0|^3.0"
15+
"psr/log": "^1.0|^2.0|^3.0",
16+
"psr/http-message": "^1.0|^2.0"
1617
},
1718
"suggest": {
1819
"elasticsearch/elasticsearch": "^7.0|^8.0",

src/Agent.php

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,8 @@ public function chat(Message|array $messages): Message
108108
);
109109

110110
if ($response instanceof ToolCallMessage) {
111-
$toolCallResult = new ToolCallResultMessage($response->getTools());
112-
113-
foreach ($toolCallResult->getTools() as $tool) {
114-
$this->notify('tool-calling', new ToolCalling($tool));
115-
$tool->execute();
116-
$this->notify('tool-called', new ToolCalled($tool));
117-
}
118-
119-
$response = $this->chat([
120-
$response, $toolCallResult
121-
]);
111+
$toolCallResult = $this->executeTools($response);
112+
$response = $this->chat([$response, $toolCallResult]);
122113
}
123114

124115
$this->notify('message-saving', new MessageSaving($response));
@@ -129,6 +120,45 @@ public function chat(Message|array $messages): Message
129120
return $response;
130121
}
131122

123+
public function stream(Message|array $messages): \Generator
124+
{
125+
$this->notify('stream-start');
126+
127+
$messages = is_array($messages) ? $messages : [$messages];
128+
129+
foreach ($messages as $message) {
130+
$this->notify('message-saving', new MessageSaving($message));
131+
$this->resolveChatHistory()->addMessage($message);
132+
$this->notify('message-saved', new MessageSaved($message));
133+
}
134+
135+
yield from $this->provider()
136+
->systemPrompt($this->instructions())
137+
->setTools($this->tools())
138+
->stream(
139+
$this->resolveChatHistory()->getMessages(),
140+
function (ToolCallMessage $toolCallMessage) {
141+
$toolCallResult = $this->executeTools($toolCallMessage);
142+
yield from $this->stream([$toolCallMessage, $toolCallResult]);
143+
}
144+
);
145+
146+
$this->notify('stream-stop');
147+
}
148+
149+
protected function executeTools(ToolCallMessage $toolCallMessage): ToolCallResultMessage
150+
{
151+
$toolCallResult = new ToolCallResultMessage($toolCallMessage->getTools());
152+
153+
foreach ($toolCallResult->getTools() as $tool) {
154+
$this->notify('tool-calling', new ToolCalling($tool));
155+
$tool->execute();
156+
$this->notify('tool-called', new ToolCalled($tool));
157+
}
158+
159+
return $toolCallResult;
160+
}
161+
132162
public function instructions(): ?string
133163
{
134164
return $this->instructions;

src/AgentInterface.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ public function withChatHistory(AbstractChatHistory $chatHistory): AgentInterfac
2828
public function observe(\SplObserver $observer, string $event = "*"): AgentInterface;
2929

3030
public function chat(Message|array $messages): Message;
31+
32+
public function stream(Message|array $messages): \Generator;
3133
}

src/Observability/AgentMonitoring.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public function __construct(protected Inspector $inspector) {}
3939
public function update(\SplSubject $subject, string $event = null, $data = null): void
4040
{
4141
$methods = [
42+
'stream-start' => 'start',
43+
'stream-stop' => 'stop',
4244
'rag-start' => 'start',
4345
'rag-stop' => 'stop',
4446
'chat-start' => "start",

src/Providers/AIProviderInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function chat(array $messages): Message;
3333

3434
//public function structured(array|string $messages): Message;
3535

36-
//public function stream(array|string $messages): Message;
36+
public function stream(array|string $messages, callable $executeToolsCallback): \Generator;
3737

3838
/**
3939
* The context window limitation of the LLM.

src/Providers/Anthropic/Anthropic.php

Lines changed: 4 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,21 @@
55
use GuzzleHttp\Exception\GuzzleException;
66
use NeuronAI\Chat\Messages\AssistantMessage;
77
use NeuronAI\Chat\Messages\Message;
8+
use NeuronAI\Exceptions\ProviderException;
89
use NeuronAI\Providers\AIProviderInterface;
910
use NeuronAI\Providers\HandleWithTools;
1011
use NeuronAI\Chat\Messages\ToolCallMessage;
1112
use NeuronAI\Tools\ToolInterface;
1213
use NeuronAI\Tools\ToolProperty;
1314
use NeuronAI\Chat\Messages\Usage;
1415
use GuzzleHttp\Client;
16+
use Psr\Http\Message\StreamInterface;
1517

1618
class Anthropic implements AIProviderInterface
1719
{
1820
use HandleWithTools;
21+
use HandleChat;
22+
use HandleStream;
1923

2024
/**
2125
* The http client.
@@ -62,56 +66,6 @@ public function systemPrompt(?string $prompt): AIProviderInterface
6266
return $this;
6367
}
6468

65-
/**
66-
* Send a message to the LLM.
67-
*
68-
* @param Message|array<Message> $messages
69-
* @throws GuzzleException
70-
*/
71-
public function chat(array $messages): Message
72-
{
73-
$mapper = new MessageMapper($messages);
74-
75-
$json = \array_filter([
76-
'model' => $this->model,
77-
'max_tokens' => $this->max_tokens,
78-
'stop_sequences' => $this->stop_sequences,
79-
'temperature' => $this->temperature,
80-
'system' => $this->system ?? null,
81-
'messages' => $mapper->map(),
82-
]);
83-
84-
if (!empty($this->tools)) {
85-
$json['tools'] = $this->generateToolsPayload();
86-
}
87-
88-
// https://docs.anthropic.com/claude/reference/messages_post
89-
$result = $this->client->post('v1/messages', compact('json'))
90-
->getBody()->getContents();
91-
92-
$result = \json_decode($result, true);
93-
94-
$content = \end($result['content']);
95-
96-
if ($content['type'] === 'tool_use') {
97-
$response = $this->createToolMessage($content);
98-
} else {
99-
$response = new AssistantMessage($content['text']);
100-
}
101-
102-
// Attach the usage for the current interaction
103-
if (\array_key_exists('usage', $result)) {
104-
$response->setUsage(
105-
new Usage(
106-
$result['usage']['input_tokens'],
107-
$result['usage']['output_tokens']
108-
)
109-
);
110-
}
111-
112-
return $response;
113-
}
114-
11569
public function generateToolsPayload(): array
11670
{
11771
return \array_map(function (ToolInterface $tool) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
namespace NeuronAI\Providers\Anthropic;
4+
5+
use GuzzleHttp\Exception\GuzzleException;
6+
use NeuronAI\Chat\Messages\AssistantMessage;
7+
use NeuronAI\Chat\Messages\Message;
8+
use NeuronAI\Chat\Messages\Usage;
9+
10+
trait HandleChat
11+
{
12+
/**
13+
* Send a message to the LLM.
14+
*
15+
* @param Message|array<Message> $messages
16+
* @throws GuzzleException
17+
*/
18+
public function chat(array $messages): Message
19+
{
20+
$mapper = new MessageMapper($messages);
21+
22+
$json = \array_filter([
23+
'model' => $this->model,
24+
'max_tokens' => $this->max_tokens,
25+
'stop_sequences' => $this->stop_sequences,
26+
'temperature' => $this->temperature,
27+
'system' => $this->system ?? null,
28+
'messages' => $mapper->map(),
29+
]);
30+
31+
if (!empty($this->tools)) {
32+
$json['tools'] = $this->generateToolsPayload();
33+
}
34+
35+
// https://docs.anthropic.com/claude/reference/messages_post
36+
$result = $this->client->post('v1/messages', compact('json'))
37+
->getBody()->getContents();
38+
39+
$result = \json_decode($result, true);
40+
41+
$content = \end($result['content']);
42+
43+
if ($content['type'] === 'tool_use') {
44+
$response = $this->createToolMessage($content);
45+
} else {
46+
$response = new AssistantMessage($content['text']);
47+
}
48+
49+
// Attach the usage for the current interaction
50+
if (\array_key_exists('usage', $result)) {
51+
$response->setUsage(
52+
new Usage(
53+
$result['usage']['input_tokens'],
54+
$result['usage']['output_tokens']
55+
)
56+
);
57+
}
58+
59+
return $response;
60+
}
61+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
<?php
2+
3+
namespace NeuronAI\Providers\Anthropic;
4+
5+
use GuzzleHttp\Exception\GuzzleException;
6+
use NeuronAI\Exceptions\ProviderException;
7+
use Psr\Http\Message\StreamInterface;
8+
9+
trait HandleStream
10+
{
11+
12+
/**
13+
* @throws ProviderException
14+
* @throws GuzzleException
15+
*/
16+
public function stream(array|string $messages, callable $executeToolsCallback): \Generator
17+
{
18+
$mapper = new MessageMapper($messages);
19+
20+
$json = \array_filter([
21+
'stream' => true,
22+
'model' => $this->model,
23+
'max_tokens' => $this->max_tokens,
24+
'stop_sequences' => $this->stop_sequences,
25+
'temperature' => $this->temperature,
26+
'system' => $this->system ?? null,
27+
'messages' => $mapper->map(),
28+
]);
29+
30+
if (!empty($this->tools)) {
31+
$json['tools'] = $this->generateToolsPayload();
32+
}
33+
34+
// https://docs.anthropic.com/claude/reference/messages_post
35+
$stream = $this->client->post('v1/messages', compact('json'))->getBody();
36+
37+
$toolCalls = [];
38+
39+
while (! $stream->eof()) {
40+
if (!$line = $this->parseNextDataLine($stream)) {
41+
continue;
42+
}
43+
44+
// Tool calls detection (https://docs.anthropic.com/en/api/messages-streaming#streaming-request-with-tool-use)
45+
if (
46+
(isset($line['content_block']['type']) && $line['content_block']['type'] === 'tool_use') ||
47+
(isset($line['delta']['type']) && $line['delta']['type'] === 'input_json_delta')
48+
) {
49+
$toolCalls = $this->composeToolCalls($line, $toolCalls);
50+
continue;
51+
}
52+
53+
// Handle tool call
54+
if ($line['type'] === 'content_block_stop' && !empty($toolCalls)) {
55+
// Restore the input field as an array
56+
$toolCalls = \array_map(function (array $call) {
57+
$call['input'] = json_decode($call['input'], true);
58+
return $call;
59+
}, $toolCalls);
60+
61+
yield from $executeToolsCallback(
62+
$this->createToolMessage(\end($toolCalls))
63+
);
64+
}
65+
66+
// Process regular content
67+
$content = $line['delta']['text']??'';
68+
69+
yield $content;
70+
}
71+
}
72+
73+
/**
74+
* Recreate the tool_call format of anthropic API from streaming.
75+
*
76+
* @param array<string, mixed> $line
77+
* @param array<int, array<string, mixed>> $toolCalls
78+
* @return array<int, array<string, mixed>>
79+
*/
80+
protected function composeToolCalls(array $line, array $toolCalls): array
81+
{
82+
if (!\array_key_exists($line['index'], $toolCalls)) {
83+
$toolCalls[$line['index']] = [
84+
'type' => 'tool_use',
85+
'id' => $line['content_block']['id'],
86+
'name' => $line['content_block']['name'],
87+
'input' => '',
88+
];
89+
} else {
90+
if ($input = $line['delta']['partial_json']??null) {
91+
$toolCalls[$line['index']]['input'] .= $input;
92+
}
93+
}
94+
95+
return $toolCalls;
96+
}
97+
98+
protected function parseNextDataLine(StreamInterface $stream): ?array
99+
{
100+
$line = $this->readLine($stream);
101+
102+
if (! \str_starts_with($line, 'data:')) {
103+
return null;
104+
}
105+
106+
$line = \trim(\substr($line, \strlen('data: ')));
107+
108+
try {
109+
return \json_decode($line, true, flags: JSON_THROW_ON_ERROR);
110+
} catch (\Throwable $exception) {
111+
throw new ProviderException('Anthropic streaming error - '.$exception->getMessage());
112+
}
113+
}
114+
115+
protected function readLine(StreamInterface $stream): string
116+
{
117+
$buffer = '';
118+
119+
while (! $stream->eof()) {
120+
$byte = $stream->read(1);
121+
122+
if ($byte === '') {
123+
return $buffer;
124+
}
125+
126+
$buffer .= $byte;
127+
128+
if ($byte === "\n") {
129+
break;
130+
}
131+
}
132+
133+
return $buffer;
134+
}
135+
}

0 commit comments

Comments
 (0)