Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent exports #790

Merged
merged 2 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add support for concurrent exports
  • Loading branch information
Nevay committed Aug 5, 2022
commit 4fc8c3875cf140d4773c6591bda0bc1a7b36ed1c
28 changes: 28 additions & 0 deletions src/SDK/Common/Future/CompletedFuture.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Common\Future;

/**
* @template T
* @template-implements FutureInterface<T>
*/
final class CompletedFuture implements FutureInterface
{
/** @var T */
private $value;

/**
* @param T $value
*/
public function __construct($value)
{
$this->value = $value;
}

public function await(?CancellationInterface $cancellation = null)
{
return $this->value;
}
}
16 changes: 16 additions & 0 deletions src/SDK/Common/Future/FutureInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Common\Future;

/**
* @template T
*/
interface FutureInterface
{
/**
* @psalm-return T
*/
public function await(?CancellationInterface $cancellation = null);
}
8 changes: 4 additions & 4 deletions src/SDK/Trace/Behavior/SpanExporterDecoratorTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace OpenTelemetry\SDK\Trace\Behavior;

use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;

Expand All @@ -14,16 +15,15 @@ trait SpanExporterDecoratorTrait

/**
* @param iterable<SpanDataInterface> $spans
* @return int
* @psalm-return SpanExporterInterface::STATUS_*
* @return FutureInterface<int>
*/
public function export(iterable $spans, ?CancellationInterface $cancellation = null): int
public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface
{
$response = $this->decorated->export(
$this->beforeExport($spans),
$cancellation,
);
$this->afterExport($spans, $response);
$this->afterExport($spans, $response->await());

return $response;
}
Expand Down
15 changes: 7 additions & 8 deletions src/SDK/Trace/Behavior/SpanExporterTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace OpenTelemetry\SDK\Trace\Behavior;

use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;

Expand All @@ -29,19 +31,16 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool
abstract public static function fromConnectionString(string $endpointUrl, string $name, string $args);

/**
* @param iterable<SpanDataInterface> $spans Batch of spans to export
*
* @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#exportbatch
*
* @psalm-return SpanExporterInterface::STATUS_*
* @param iterable<SpanDataInterface> $spans
* @return FutureInterface<int>
*/
public function export(iterable $spans, ?CancellationInterface $cancellation = null): int
public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface
{
if (!$this->running) {
return SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE;
return new CompletedFuture(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE);
}

return $this->doExport($spans); /** @phpstan-ignore-line */
return new CompletedFuture($this->doExport($spans)); /** @phpstan-ignore-line */
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/SDK/Trace/SpanExporterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace OpenTelemetry\SDK\Trace;

use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\FutureInterface;

/**
* @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#span-exporter
Expand All @@ -25,9 +26,9 @@ public static function fromConnectionString(string $endpointUrl, string $name, s
*
* @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#exportbatch
*
* @psalm-return SpanExporterInterface::STATUS_*
* @psalm-return FutureInterface<int>
*/
public function export(iterable $spans, ?CancellationInterface $cancellation = null): int;
public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface;

/** @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#shutdown-2 */
public function shutdown(?CancellationInterface $cancellation = null): bool;
Expand Down
2 changes: 1 addition & 1 deletion src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool
return true;
}

$this->exporter->export($this->queue);
$this->exporter->export($this->queue)->await();
$this->queue = [];
$this->stopwatch->reset();
$this->exporter->forceFlush();
Expand Down
2 changes: 1 addition & 1 deletion src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function onEnd(ReadableSpanInterface $span): void
}

if (null !== $this->exporter) {
$this->exporter->export([$span->toSpanData()]);
$this->exporter->export([$span->toSpanData()])->await();
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/Contrib/AbstractHttpExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function test_exporter_response_status($responseStatus, $expected): void
$expected,
$this->createExporter()->export([
$this->createMock(SpanData::class),
])
])->await(),
);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public function test_client_exception_decides_return_code($exception, $expected)
$expected,
$this->createExporter()->export([
$this->createMock(SpanData::class),
])
])->await(),
);
}

Expand Down
12 changes: 6 additions & 6 deletions tests/Unit/Contrib/AgentExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

declare(strict_types=1);

namespace OpenTelemetry\Tests\Contrib\Unit;
namespace OpenTelemetry\Tests\Unit\Contrib;

use OpenTelemetry\Contrib\Jaeger\AgentExporter;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\Tests\Unit\SDK\Util\SpanData;
use PHPUnit\Framework\TestCase;

/**
* @covers OpenTelemetry\Contrib\Jaeger\AgentExporter
* @covers OpenTelemetry\Contrib\Jaeger\JaegerTransport
* @covers OpenTelemetry\Contrib\Jaeger\ThriftUdpTransport
* @covers OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
* @covers \OpenTelemetry\Contrib\Jaeger\AgentExporter
* @covers \OpenTelemetry\Contrib\Jaeger\JaegerTransport
* @covers \OpenTelemetry\Contrib\Jaeger\ThriftUdpTransport
* @covers \OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
*/
class AgentExporterTest extends TestCase
{
Expand All @@ -24,7 +24,7 @@ public function test_happy_path()
'someServiceName',
);

$status = $exporter->export([new SpanData()]);
$status = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_SUCCESS, $status);

Expand Down
14 changes: 7 additions & 7 deletions tests/Unit/Contrib/JaegerHttpCollectorExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
use PHPUnit\Framework\TestCase;

/**
* @covers OpenTelemetry\Contrib\Jaeger\HttpCollectorExporter
* @covers OpenTelemetry\Contrib\Jaeger\HttpSender
* @covers OpenTelemetry\Contrib\Jaeger\ThriftHttpTransport
* @covers OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
* @covers OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapter
* @covers OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapterFactory
* @covers \OpenTelemetry\Contrib\Jaeger\HttpCollectorExporter
* @covers \OpenTelemetry\Contrib\Jaeger\HttpSender
* @covers \OpenTelemetry\Contrib\Jaeger\ThriftHttpTransport
* @covers \OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
* @covers \OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapter
* @covers \OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapterFactory
*
*/
class JaegerHttpCollectorExporterTest extends TestCase
Expand All @@ -35,7 +35,7 @@ public function test_happy_path()
$this->getStreamFactoryInterfaceMock()
);

$status = $exporter->export([new SpanData()]);
$status = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_SUCCESS, $status);
}
Expand Down
8 changes: 4 additions & 4 deletions tests/Unit/Contrib/OTLPGrpcExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use org\bovigo\vfs\vfsStream;

/**
* @covers OpenTelemetry\Contrib\OtlpGrpc\Exporter
* @covers \OpenTelemetry\Contrib\OtlpGrpc\Exporter
*/
class OTLPGrpcExporterTest extends AbstractExporterTest
{
Expand Down Expand Up @@ -55,7 +55,7 @@ public function test_exporter_happy_path(): void
])
);

$exporterStatusCode = $exporter->export([new SpanData()]);
$exporterStatusCode = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_SUCCESS, $exporterStatusCode);
}
Expand All @@ -80,14 +80,14 @@ public function test_exporter_unexpected_grpc_response_status(): void
])
);

$exporterStatusCode = $exporter->export([new SpanData()]);
$exporterStatusCode = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE, $exporterStatusCode);
}

public function test_exporter_grpc_responds_as_unavailable(): void
{
$this->assertEquals(SpanExporterInterface::STATUS_FAILED_RETRYABLE, (new Exporter())->export([new SpanData()]));
$this->assertEquals(SpanExporterInterface::STATUS_FAILED_RETRYABLE, (new Exporter())->export([new SpanData()])->await());
}

public function test_set_headers_with_environment_variables(): void
Expand Down
10 changes: 5 additions & 5 deletions tests/Unit/Contrib/OTLPHttpExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use Psr\Http\Client\NetworkExceptionInterface;

/**
* @covers OpenTelemetry\Contrib\OtlpHttp\Exporter
* @covers \OpenTelemetry\Contrib\OtlpHttp\Exporter
*/
class OTLPHttpExporterTest extends AbstractExporterTest
{
Expand Down Expand Up @@ -59,7 +59,7 @@ public function test_exporter_response_status($responseStatus, $expected): void

$this->assertEquals(
$expected,
$exporter->export([new SpanData()])
$exporter->export([new SpanData()])->await(),
);
}

Expand Down Expand Up @@ -90,7 +90,7 @@ public function test_client_exceptions_should_decide_return_code($exception, $ex

$this->assertEquals(
$expected,
$exporter->export([new SpanData()])
$exporter->export([new SpanData()])->await(),
);
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public function test_exporter_with_config_via_env_vars(?string $endpoint, string
$client = new Client(['handler' => $stack]);
$exporter = new Exporter($client, new HttpFactory(), new HttpFactory());

$exporter->export([new SpanData()]);
$exporter->export([new SpanData()])->await();

$request = $container[0]['request'];

Expand Down Expand Up @@ -164,7 +164,7 @@ public function test_should_be_ok_to_exporter_empty_spans_collection(): void
$this->getClientInterfaceMock(),
$this->getRequestFactoryInterfaceMock(),
$this->getStreamFactoryInterfaceMock()
))->export([])
))->export([])->await(),
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public function test_fails_if_not_test_running(): void
$span = $this->createMock(SpanData::class);
$exporter->shutdown();

$this->assertSame(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE, $exporter->export([$span]));
$this->assertSame(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE, $exporter->export([$span])->await());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use OpenTelemetry\SDK\Trace\SpanExporterInterface;

/**
* @covers OpenTelemetry\SDK\Trace\SpanExporter\ConsoleSpanExporter
* @covers \OpenTelemetry\SDK\Trace\SpanExporter\ConsoleSpanExporter
*/
class ConsoleSpanExporterTest extends AbstractExporterTest
{
Expand Down Expand Up @@ -64,7 +64,7 @@ public function test_export_success(): void
SpanExporterInterface::STATUS_SUCCESS,
(new ConsoleSpanExporter($converter))->export([
$this->createMock(SpanDataInterface::class),
])
])->await(),
);

ob_end_clean();
Expand All @@ -84,7 +84,7 @@ public function test_export_failed(): void
SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE,
(new ConsoleSpanExporter($converter))->export([
$this->createMock(SpanDataInterface::class),
])
])->await(),
);

ob_end_clean();
Expand All @@ -108,7 +108,7 @@ public function test_export_output(): void

(new ConsoleSpanExporter($converter))->export([
$this->createMock(SpanDataInterface::class),
]);
])->await();
}

public function test_from_connection_string(): void
Expand Down
Loading