Skip to content

Commit aa89bba

Browse files
authored
Server-sent events (#347)
* WIP * Update EventStream.php * Update CHANGELOG.md * Added tests
1 parent 67a9cdf commit aa89bba

File tree

8 files changed

+439
-0
lines changed

8 files changed

+439
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- `MySQL`
88
- `Postgres`
99
* Now possible to define custom input/output value objects for the query builder.
10+
* Added a `EventStream` response sender to simplify sending [sever-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).
1011

1112
--------------------------------------------------------
1213

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
<?php
2+
3+
/**
4+
* @copyright Frederic G. Østby
5+
* @license http://www.makoframework.com/license
6+
*/
7+
8+
namespace mako\http\response\senders;
9+
10+
use Closure;
11+
use Generator;
12+
use JsonSerializable;
13+
use mako\http\Request;
14+
use mako\http\Response;
15+
use mako\http\response\senders\stream\event\Event;
16+
use Override;
17+
use Stringable;
18+
19+
use function connection_aborted;
20+
use function flush;
21+
use function is_object;
22+
use function json_encode;
23+
use function ob_end_clean;
24+
use function ob_get_level;
25+
26+
/**
27+
* Event stream response.
28+
*
29+
* @see https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
30+
*/
31+
class EventStream implements ResponseSenderInterface
32+
{
33+
/**
34+
* Constructor.
35+
*/
36+
public function __construct(
37+
protected Closure $stream
38+
) {
39+
}
40+
41+
/**
42+
* Erases and disables output buffers.
43+
*/
44+
protected function eraseAndDisableOutputBuffers(): void
45+
{
46+
while (ob_get_level() > 0) {
47+
ob_end_clean();
48+
}
49+
}
50+
51+
/**
52+
* Stringifies the value.
53+
*/
54+
protected function stringifyValue(null|float|int|JsonSerializable|string|Stringable $value): string
55+
{
56+
if (is_object($value)) {
57+
if ($value instanceof JsonSerializable) {
58+
return json_encode($value, JSON_THROW_ON_ERROR);
59+
}
60+
}
61+
62+
return (string) $value;
63+
}
64+
65+
/**
66+
* Prepares the event for sending.
67+
*/
68+
protected function prepareEvent(Event $event): string
69+
{
70+
$output = '';
71+
72+
foreach ($event->fields as $field) {
73+
$output .= "{$field->type->value}: {$this->stringifyValue($field->value)}\n";
74+
}
75+
76+
$output .= "\n";
77+
78+
return $output;
79+
}
80+
81+
/**
82+
* Sends the event to the client.
83+
*/
84+
protected function sendEvent(string $event): void
85+
{
86+
echo $event;
87+
88+
flush();
89+
}
90+
91+
/**
92+
* Sends the stream to the client.
93+
*/
94+
protected function sendStream(): void
95+
{
96+
foreach ((fn (): Generator => ($this->stream)())() as $event) {
97+
if (connection_aborted()) {
98+
break;
99+
}
100+
101+
$this->sendEvent($this->prepareEvent($event));
102+
}
103+
}
104+
105+
/**
106+
* {@inheritDoc}
107+
*/
108+
#[Override]
109+
public function send(Request $request, Response $response): void
110+
{
111+
$response->setType('text/event-stream', 'UTF-8');
112+
113+
$response->headers->add('Connection', 'keep-alive');
114+
$response->headers->add('Cache-Control', 'no-cache');
115+
$response->headers->add('X-Accel-Buffering', 'no');
116+
117+
// Erase output buffers and disable output buffering
118+
119+
$this->eraseAndDisableOutputBuffers();
120+
121+
// Send headers
122+
123+
$response->sendHeaders();
124+
125+
// Send the stream
126+
127+
$this->sendStream();
128+
}
129+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
/**
4+
* @copyright Frederic G. Østby
5+
* @license http://www.makoframework.com/license
6+
*/
7+
8+
namespace mako\http\response\senders\stream\event;
9+
10+
/**
11+
* Event stream event.
12+
*/
13+
class Event
14+
{
15+
/**
16+
* Event fields.
17+
*
18+
* @var Field[]
19+
*/
20+
public protected(set) array $fields;
21+
22+
/**
23+
* Constructor.
24+
*/
25+
public function __construct(Field ...$fields)
26+
{
27+
$this->fields = $fields;
28+
}
29+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/**
4+
* @copyright Frederic G. Østby
5+
* @license http://www.makoframework.com/license
6+
*/
7+
8+
namespace mako\http\response\senders\stream\event;
9+
10+
use JsonSerializable;
11+
use Stringable;
12+
13+
/**
14+
* Event stream field.
15+
*/
16+
class Field
17+
{
18+
public function __construct(
19+
public protected(set) Type $type,
20+
public protected(set) null|float|int|JsonSerializable|string|Stringable $value
21+
) {
22+
}
23+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
/**
4+
* @copyright Frederic G. Østby
5+
* @license http://www.makoframework.com/license
6+
*/
7+
8+
namespace mako\http\response\senders\stream\event;
9+
10+
/**
11+
* Event stream field types.
12+
*/
13+
enum Type: string
14+
{
15+
case COMMENT = '';
16+
case DATA = 'data';
17+
case EVENT = 'event';
18+
case ID = 'id';
19+
case RETRY = 'retry';
20+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
<?php
2+
3+
/**
4+
* @copyright Frederic G. Østby
5+
* @license http://www.makoframework.com/license
6+
*/
7+
8+
namespace mako\tests\unit\http\response\senders;
9+
10+
use JsonSerializable;
11+
use mako\http\Request;
12+
use mako\http\Response;
13+
use mako\http\response\Headers;
14+
use mako\http\response\senders\EventStream;
15+
use mako\http\response\senders\stream\event\Event;
16+
use mako\http\response\senders\stream\event\Field;
17+
use mako\http\response\senders\stream\event\Type;
18+
use mako\tests\TestCase;
19+
use Mockery;
20+
use Mockery\MockInterface;
21+
use PHPUnit\Framework\Attributes\Group;
22+
use Stringable;
23+
24+
#[Group('unit')]
25+
class EventStreamTest extends TestCase
26+
{
27+
/**
28+
*
29+
*/
30+
protected function getRequest(): MockInterface&Request
31+
{
32+
return Mockery::mock(Request::class);
33+
}
34+
35+
/**
36+
*
37+
*/
38+
protected function getResponse(): MockInterface&Response
39+
{
40+
$headers = Mockery::mock(Headers::class);
41+
42+
$headers->shouldReceive('add')->once()->with('Connection', 'keep-alive');
43+
$headers->shouldReceive('add')->once()->with('Cache-Control', 'no-cache');
44+
$headers->shouldReceive('add')->once()->with('X-Accel-Buffering', 'no');
45+
46+
$response = Mockery::mock(Response::class);
47+
48+
$response->shouldReceive('setType')->once()->with('text/event-stream', 'UTF-8');
49+
$response->shouldReceive('sendHeaders')->once();
50+
51+
(function () use ($headers): void {
52+
$this->headers = $headers;
53+
})->bindTo($response, Response::class)();
54+
55+
return $response;
56+
}
57+
58+
/**
59+
*
60+
*/
61+
public function testBasicEventStream(): void
62+
{
63+
$eventStream = Mockery::mock(EventStream::class, [function () {
64+
yield new Event(
65+
new Field(Type::DATA, 'hello, world!')
66+
);
67+
}]);
68+
69+
$eventStream->makePartial()->shouldAllowMockingProtectedMethods();
70+
71+
$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();
72+
73+
$eventStream->shouldReceive('sendEvent')->once()->with("data: hello, world!\n\n");
74+
75+
$eventStream->send($this->getRequest(), $this->getResponse());
76+
}
77+
78+
/**
79+
*
80+
*/
81+
public function testEventStreamWithMultipleFields(): void
82+
{
83+
$eventStream = Mockery::mock(EventStream::class, [function () {
84+
yield new Event(
85+
new Field(Type::EVENT, 'greeting'),
86+
new Field(Type::DATA, 'hello, world!')
87+
);
88+
}]);
89+
90+
$eventStream->makePartial()->shouldAllowMockingProtectedMethods();
91+
92+
$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();
93+
94+
$eventStream->shouldReceive('sendEvent')->once()->with("event: greeting\ndata: hello, world!\n\n");
95+
96+
$eventStream->send($this->getRequest(), $this->getResponse());
97+
}
98+
99+
/**
100+
*
101+
*/
102+
public function testEventStreamWithMultipleEvents(): void
103+
{
104+
$eventStream = Mockery::mock(EventStream::class, [function () {
105+
yield new Event(
106+
new Field(Type::EVENT, 'greeting'),
107+
new Field(Type::DATA, 'first hello')
108+
);
109+
yield new Event(
110+
new Field(Type::EVENT, 'greeting'),
111+
new Field(Type::DATA, 'second hello')
112+
);
113+
}]);
114+
115+
$eventStream->makePartial()->shouldAllowMockingProtectedMethods();
116+
117+
$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();
118+
119+
$eventStream->shouldReceive('sendEvent')->once()->with("event: greeting\ndata: first hello\n\n");
120+
$eventStream->shouldReceive('sendEvent')->once()->with("event: greeting\ndata: second hello\n\n");
121+
122+
$eventStream->send($this->getRequest(), $this->getResponse());
123+
}
124+
125+
/**
126+
*
127+
*/
128+
public function testEventStreamWithStringable(): void
129+
{
130+
$eventStream = Mockery::mock(EventStream::class, [function () {
131+
yield new Event(
132+
new Field(Type::DATA, new class implements Stringable {
133+
public function __toString(): string
134+
{
135+
return 'this is a string';
136+
}
137+
})
138+
);
139+
}]);
140+
141+
$eventStream->makePartial()->shouldAllowMockingProtectedMethods();
142+
143+
$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();
144+
145+
$eventStream->shouldReceive('sendEvent')->once()->with("data: this is a string\n\n");
146+
147+
$eventStream->send($this->getRequest(), $this->getResponse());
148+
}
149+
150+
/**
151+
*
152+
*/
153+
public function testEventStreamWithJsonSerializable(): void
154+
{
155+
$eventStream = Mockery::mock(EventStream::class, [function () {
156+
yield new Event(
157+
new Field(Type::DATA, new class implements JsonSerializable {
158+
public function jsonSerialize(): mixed
159+
{
160+
return [1, 2, 3];
161+
}
162+
})
163+
);
164+
}]);
165+
166+
$eventStream->makePartial()->shouldAllowMockingProtectedMethods();
167+
168+
$eventStream->shouldReceive('eraseAndDisableOutputBuffers')->once();
169+
170+
$eventStream->shouldReceive('sendEvent')->once()->with("data: [1,2,3]\n\n");
171+
172+
$eventStream->send($this->getRequest(), $this->getResponse());
173+
}
174+
}

0 commit comments

Comments
 (0)