Skip to content

Commit c290238

Browse files
refactor: remove outgoing message queue from session and simplify transport send method to directly dispatch messages.
1 parent 6f386f2 commit c290238

File tree

6 files changed

+41
-81
lines changed

6 files changed

+41
-81
lines changed

src/Client/Protocol.php

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public function initialize(Configuration $config): Response|Error
123123
}
124124

125125
/**
126-
* Send a request to the server.
126+
* Send a request to the server and wait for response.
127127
*
128128
* If a response is immediately available (sync HTTP), returns it.
129129
* Otherwise, suspends the Fiber and waits for the transport to resume it.
@@ -144,16 +144,8 @@ public function request(Request $request, int $timeout, bool $withProgress = fal
144144
$request = $request->withMeta(['progressToken' => $progressToken]);
145145
}
146146

147-
$this->logger->debug('Sending request', [
148-
'id' => $requestId,
149-
'method' => $request::getMethod(),
150-
]);
151-
152-
$encoded = json_encode($request, \JSON_THROW_ON_ERROR);
153-
$this->session->queueOutgoing($encoded, ['type' => 'request']);
154147
$this->session->addPendingRequest($requestId, $timeout);
155-
156-
$this->flushOutgoing();
148+
$this->sendRequest($request);
157149

158150
$immediate = $this->session->consumeResponse($requestId);
159151
if (null !== $immediate) {
@@ -171,6 +163,20 @@ public function request(Request $request, int $timeout, bool $withProgress = fal
171163
]);
172164
}
173165

166+
/**
167+
* Send a request to the server.
168+
*/
169+
private function sendRequest(Request $request): void
170+
{
171+
$this->logger->debug('Sending request', [
172+
'id' => $request->getId(),
173+
'method' => $request::getMethod(),
174+
]);
175+
176+
$encoded = json_encode($request, \JSON_THROW_ON_ERROR);
177+
$this->transport?->send($encoded);
178+
}
179+
174180
/**
175181
* Send a notification to the server (fire and forget).
176182
*/
@@ -179,8 +185,20 @@ public function sendNotification(Notification $notification): void
179185
$this->logger->debug('Sending notification', ['method' => $notification::getMethod()]);
180186

181187
$encoded = json_encode($notification, \JSON_THROW_ON_ERROR);
182-
$this->session->queueOutgoing($encoded, ['type' => 'notification']);
183-
$this->flushOutgoing();
188+
$this->transport?->send($encoded);
189+
}
190+
191+
/**
192+
* Send a response back to the server (for server-initiated requests).
193+
*
194+
* @param Response<mixed>|Error $response
195+
*/
196+
private function sendResponse(Response|Error $response): void
197+
{
198+
$this->logger->debug('Sending response', ['id' => $response->getId()]);
199+
200+
$encoded = json_encode($response, \JSON_THROW_ON_ERROR);
201+
$this->transport?->send($encoded);
184202
}
185203

186204
/**
@@ -204,9 +222,9 @@ public function processMessage(string $input): void
204222
if ($message instanceof Response || $message instanceof Error) {
205223
$this->handleResponse($message);
206224
} elseif ($message instanceof Request) {
207-
$this->handleServerRequest($message);
225+
$this->handleRequest($message);
208226
} elseif ($message instanceof Notification) {
209-
$this->handleServerNotification($message);
227+
$this->handleNotification($message);
210228
}
211229
}
212230
}
@@ -224,17 +242,13 @@ private function handleResponse(Response|Error $response): void
224242

225243
$this->logger->debug('Handling response', ['id' => $requestId]);
226244

227-
if ($response instanceof Response) {
228-
$this->session->storeResponse($requestId, $response->jsonSerialize());
229-
} else {
230-
$this->session->storeResponse($requestId, $response->jsonSerialize());
231-
}
245+
$this->session->storeResponse($requestId, $response->jsonSerialize());
232246
}
233247

234248
/**
235249
* Handle a request from the server (e.g., sampling request).
236250
*/
237-
private function handleServerRequest(Request $request): void
251+
private function handleRequest(Request $request): void
238252
{
239253
$method = $request::getMethod();
240254

@@ -259,9 +273,7 @@ private function handleServerRequest(Request $request): void
259273
);
260274
}
261275

262-
$encoded = json_encode($response, \JSON_THROW_ON_ERROR);
263-
$this->session->queueOutgoing($encoded, ['type' => $response instanceof Response ? 'response' : 'error']);
264-
$this->flushOutgoing();
276+
$this->sendResponse($response);
265277

266278
return;
267279
}
@@ -272,15 +284,13 @@ private function handleServerRequest(Request $request): void
272284
$request->getId()
273285
);
274286

275-
$encoded = json_encode($error, \JSON_THROW_ON_ERROR);
276-
$this->session->queueOutgoing($encoded, ['type' => 'error']);
277-
$this->flushOutgoing();
287+
$this->sendResponse($error);
278288
}
279289

280290
/**
281291
* Handle a notification from the server.
282292
*/
283-
private function handleServerNotification(Notification $notification): void
293+
private function handleNotification(Notification $notification): void
284294
{
285295
$method = $notification::getMethod();
286296

@@ -301,21 +311,6 @@ private function handleServerNotification(Notification $notification): void
301311
}
302312
}
303313

304-
/**
305-
* Flush any queued outgoing messages.
306-
*/
307-
private function flushOutgoing(): void
308-
{
309-
if (null === $this->transport) {
310-
return;
311-
}
312-
313-
$messages = $this->session->consumeOutgoingMessages();
314-
foreach ($messages as $item) {
315-
$this->transport->send($item['message'], $item['context']);
316-
}
317-
}
318-
319314
public function getSession(): ClientSessionInterface
320315
{
321316
return $this->session;

src/Client/Session/ClientSession.php

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ class ClientSession implements ClientSessionInterface
3535
/** @var array<int, array<string, mixed>> */
3636
private array $responses = [];
3737

38-
/** @var array<int, array{message: string, context: array<string, mixed>}> */
39-
private array $outgoingQueue = [];
40-
4138
/** @var array<int, array{token: string, progress: float, total: ?float, message: ?string}> */
4239
private array $progressUpdates = [];
4340

@@ -97,22 +94,6 @@ public function consumeResponse(int $requestId): Response|Error|null
9794
return Response::fromArray($data);
9895
}
9996

100-
public function queueOutgoing(string $message, array $context): void
101-
{
102-
$this->outgoingQueue[] = [
103-
'message' => $message,
104-
'context' => $context,
105-
];
106-
}
107-
108-
public function consumeOutgoingMessages(): array
109-
{
110-
$messages = $this->outgoingQueue;
111-
$this->outgoingQueue = [];
112-
113-
return $messages;
114-
}
115-
11697
public function setInitialized(bool $initialized): void
11798
{
11899
$this->initialized = $initialized;

src/Client/Session/ClientSessionInterface.php

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,6 @@ public function storeResponse(int $requestId, array $responseData): void;
7070
*/
7171
public function consumeResponse(int $requestId): Response|Error|null;
7272

73-
/**
74-
* Queue an outgoing message.
75-
*
76-
* @param string $message JSON-encoded message
77-
* @param array<string, mixed> $context Message context
78-
*/
79-
public function queueOutgoing(string $message, array $context): void;
80-
81-
/**
82-
* Get and clear all queued outgoing messages.
83-
*
84-
* @return array<int, array{message: string, context: array<string, mixed>}>
85-
*/
86-
public function consumeOutgoingMessages(): array;
87-
8873
/**
8974
* Set initialization state.
9075
*/

src/Client/Transport/HttpTransport.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public function connectAndInitialize(): void
9393
$this->logger->info('HTTP client connected and initialized', ['endpoint' => $this->endpoint]);
9494
}
9595

96-
public function send(string $data, array $context): void
96+
public function send(string $data): void
9797
{
9898
$request = $this->requestFactory->createRequest('POST', $this->endpoint)
9999
->withHeader('Content-Type', 'application/json')

src/Client/Transport/StdioTransport.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public function connectAndInitialize(): void
9090
$this->logger->info('Client connected and initialized');
9191
}
9292

93-
public function send(string $data, array $context): void
93+
public function send(string $data): void
9494
{
9595
if (null === $this->stdin || !\is_resource($this->stdin)) {
9696
throw new ConnectionException('Process stdin not available');

src/Client/Transport/TransportInterface.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ public function connectAndInitialize(): void;
4444
/**
4545
* Send a message to the server immediately.
4646
*
47-
* @param string $data JSON-encoded message
48-
* @param array<string, mixed> $context Message context (type, etc.)
47+
* @param string $data JSON-encoded message
4948
*/
50-
public function send(string $data, array $context): void;
49+
public function send(string $data): void;
5150

5251
/**
5352
* Run a request fiber to completion.

0 commit comments

Comments
 (0)