Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"ext-grpc": "*",
"ext-mbstring": "*",
"ext-opentelemetry": "*",
"ext-rdkafka": "*",
"ext-pdo": "*",
"ext-pdo_sqlite": "*",
"ext-xdebug": "*",
Expand Down
15 changes: 8 additions & 7 deletions docs/src/instrumentation/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ A DSN starts with a transport and an exporter separated by a `+` character. The

Here is table list of the available transport and exporter for traces:

| Transport | Exporter | Description | Example | Default |
|-----------|-----------|--------------------------------------------------------------|-------------------------------------------|--------------|
| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A |
| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A |
| http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A |
| empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A |
| stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout |
| Transport | Exporter | Description | Example | Default |
|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|--------------|
| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A |
| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A |
| kafka | otlp | OpenTelemetry exporter using the Kafka message broker. Add query parameters for configuring the message broker. | kafka+otlp://open_telemetry_local_alpha_traces?metadata.broker.list=kafka:9092 | N/A |
| http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A |
| empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A |
| stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout |

Note: The `stream+console` DSN is the only DSN than can refer to a stream resource using the `path` block. For example: `stream+console://default/file.log`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ private function requestAttributes(Request $request): iterable
/**
* @param array<string> $headers
*
* @return array<string, mixed>
* @return \Generator<string, array<int, string>>
*/
private function headerAttributes(HeaderBag $headerBag, array $headers): iterable
private function headerAttributes(HeaderBag $headerBag, array $headers): \Generator
{
foreach ($headers as $header => $attribute) {
if ($headerBag->has($header)) {
Expand Down
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Exporter/ConsoleExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ public function getExporter(): string
{
return $this->dsn->getExporter();
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
6 changes: 6 additions & 0 deletions src/OpenTelemetry/Exporter/ExporterDsn.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Zenstruck\Dsn;
use Zenstruck\Uri;
use Zenstruck\Uri\Part\Query;

final class ExporterDsn
{
Expand Down Expand Up @@ -60,6 +61,11 @@ public function getPort(?int $default = null): ?int
return $this->uri->port() ?? $default;
}

public function getQuery(): Query
{
return $this->uri->query();
}

/**
* @return string[]
*/
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTelemetry/Exporter/ExporterEndpointInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public function getExporter(): string;
public function getTransport(): ?string;

public static function fromDsn(ExporterDsn $dsn): self;

public function getDsn(): ExporterDsn;
}
9 changes: 9 additions & 0 deletions src/OpenTelemetry/Exporter/OtlpExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public function withSignal(string $signal): self

public function __toString()
{
if (TransportEnum::Kafka === $this->transport) {
return \sprintf('kafka://%s?%s', $this->dsn->getHost(), $this->dsn->getQuery()->toString());
}

$uri = $this->uriFactory->createUri();
$uri = $uri
->withScheme($this->transport->getScheme())
Expand Down Expand Up @@ -78,4 +82,9 @@ public function getExporter(): string
{
return 'otlp';
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Log/LogExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ public function getExporter(): string
{
return $this->exporter->value;
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Metric/MetricExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public function getExporter(): string
{
return $this->exporter->value;
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Trace/TraceExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public function getTransport(): ?string
{
return $this->transport?->value;
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
5 changes: 5 additions & 0 deletions src/OpenTelemetry/Trace/ZipkinExporterEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public function getExporter(): string
{
return 'zipkin';
}

public function getDsn(): ExporterDsn
{
return $this->dsn;
}
}
85 changes: 85 additions & 0 deletions src/OpenTelemetry/Transport/KafkaTransport.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport;

use OpenTelemetry\SDK\Common\Export\TransportInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
use OpenTelemetry\SDK\Common\Future\ErrorFuture;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\ProducerTopic;

/**
* @template-implements TransportInterface<"application/x-protobuf">
*/
final readonly class KafkaTransport implements TransportInterface
{
private const FLUSH_TIMEOUT = 10000;

private Producer $producer;
private ProducerTopic $topicHandle;

public function __construct(
Conf $configuration,
string $topic,
) {
if (!\class_exists(Conf::class)) {
throw new \RuntimeException('The PHP extension "rdkafka" is required to use the Kafka transport.');
}

$this->producer = new Producer($configuration);
$this->topicHandle = $this->producer->newTopic($topic);
}

public function contentType(): string
{
return 'application/x-protobuf';
}

/**
* @phpstan-return FutureInterface<null>
*/
public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface
{
try {
$this->topicHandle->producev(\RD_KAFKA_PARTITION_UA, 0, $payload);
} catch (\Throwable $exception) {
return new ErrorFuture($exception);
}

return new CompletedFuture(null);
}

public function shutdown(?CancellationInterface $cancellation = null): bool
{
return $this->flushInternal();
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
{
return $this->flushInternal();
}

private function flushInternal(): bool
{
// librdkafka recommends retrying the flush operation a couple of times when it returns a null result.
$timeout = self::FLUSH_TIMEOUT;
$start = \microtime(true);
do {
$res = $this->producer->flush($timeout);
if (\RD_KAFKA_RESP_ERR_NO_ERROR === $res) {
return true;
}

// reduce timeout
$elapsedMs = (int) \round((\microtime(true) - $start) * 1000);
$timeout = \max(0, self::FLUSH_TIMEOUT - $elapsedMs);
} while ($timeout > 0);

return false;
}
}
30 changes: 30 additions & 0 deletions src/OpenTelemetry/Transport/KafkaTransportFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterEndpointInterface;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterOptionsInterface;
use OpenTelemetry\SDK\Common\Export\TransportInterface;
use RdKafka\Conf;

final readonly class KafkaTransportFactory implements TransportFactoryInterface
{
public function supports(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): bool
{
return TransportEnum::Kafka === TransportEnum::tryFrom($endpoint->getTransport());
}

public function createTransport(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): TransportInterface
{
$dsn = $endpoint->getDsn();
$queryParameters = $dsn->getQuery()->all();
$conf = new Conf();
foreach ($queryParameters as $k => $v) {
$conf->set(\str_replace('_', '.', $k), (string) $v);
}

return new KafkaTransport($conf, $dsn->getHost());
}
}
2 changes: 2 additions & 0 deletions src/OpenTelemetry/Transport/TransportEnum.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ enum TransportEnum: string
case Http = 'http';
case Https = 'https';
case Stream = 'stream';
case Kafka = 'kafka';

public function getScheme(): ?string
{
return match ($this) {
self::Http, self::Grpc => 'http',
self::Https, self::Grpcs => 'https',
self::Kafka => 'kafka',
default => null,
};
}
Expand Down
4 changes: 4 additions & 0 deletions src/Resources/config/services_transports.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\AbstractTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\GrpcTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\KafkaTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\OtlpHttpTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\PsrHttpTransportFactory;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\StreamTransportFactory;
Expand Down Expand Up @@ -39,6 +40,9 @@
->parent('open_telemetry.transport_factory.abstract')
->tag('open_telemetry.transport_factory')

->set('open_telemetry.transport_factory.kafka', KafkaTransportFactory::class)
->tag('open_telemetry.transport_factory')

->set('open_telemetry.transport_factory', TransportFactory::class)
->args([
tagged_iterator('open_telemetry.transport_factory'),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace FriendsOfOpenTelemetry\OpenTelemetryBundle\Tests\Unit\OpenTelemetry\Transport;

use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\EmptyExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterDsn;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterEndpointInterface;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\ExporterOptionsInterface;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Exporter\OtlpExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Log\LogExporterEndpoint;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Metric\MetricExporterEndpoint;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Metric\MetricExporterOptions;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Trace\TraceExporterEndpoint;
use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\KafkaTransportFactory;
use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\Attributes\DataProvider;
use PHPUnit\Framework\TestCase;

#[CoversClass(KafkaTransportFactory::class)]
final class KafkaTransportFactoryTest extends TestCase
{
#[DataProvider('exporterProvider')]
public function testCreateTransportFromExporter(
ExporterEndpointInterface $endpoint,
ExporterOptionsInterface $options,
bool $shouldSupport,
): void {
$factory = new KafkaTransportFactory();

self::assertSame($shouldSupport, $factory->supports($endpoint, $options));

if ($shouldSupport) {
$transport = $factory->createTransport($endpoint, $options);
self::assertSame('application/x-protobuf', $transport->contentType());
}
}

/**
* @return \Generator<array{
* 0: ExporterEndpointInterface,
* 1: ExporterOptionsInterface,
* 2: bool
* }>
*/
public static function exporterProvider(): \Generator
{
// Kafka for traces
yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-traces?metadata_broker_list=localhost:9092')),
new OtlpExporterOptions(),
true,
];

// Kafka for metrics
yield [
MetricExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-metrics?metadata_broker_list=localhost:9092')),
new MetricExporterOptions(),
true,
];

// Kafka for logs
yield [
LogExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-logs?metadata_broker_list=localhost:9092')),
new OtlpExporterOptions(),
true,
];

// Not Kafka transports should not be supported
yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('grpc+otlp://localhost')),
new OtlpExporterOptions(),
false,
];

yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('http+otlp://localhost')),
new OtlpExporterOptions(),
false,
];

yield [
TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('stream+console://default')),
new EmptyExporterOptions(),
false,
];
}
}
Loading