Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 12 additions & 69 deletions src/Porter.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
use ScriptFUSION\Porter\Specification\ImportSpecification;
use ScriptFUSION\Porter\Transform\Transformer;
use ScriptFUSION\Retry\ExceptionHandler\ExponentialBackoffExceptionHandler;

/**
* Imports data according to an ImportSpecification.
*/
class Porter
{
const DEFAULT_FETCH_ATTEMPTS = 5;

/**
* @var Provider[]
*/
Expand All @@ -35,26 +32,6 @@ class Porter
*/
private $providerFactory;

/**
* @var CacheAdvice
*/
private $defaultCacheAdvice;

/**
* @var int
*/
private $maxFetchAttempts = self::DEFAULT_FETCH_ATTEMPTS;

/**
* @var callable
*/
private $fetchExceptionHandler;

public function __construct()
{
$this->defaultCacheAdvice = CacheAdvice::SHOULD_NOT_CACHE();
}

/**
* Imports data according to the design of the specified import specification.
*
Expand All @@ -67,7 +44,13 @@ public function __construct()
public function import(ImportSpecification $specification)
{
$specification = clone $specification;
$records = $this->fetch($specification->getResource(), $specification->getCacheAdvice());

$records = $this->fetch(
$specification->getResource(),
$specification->getCacheAdvice(),
$specification->getMaxFetchAttempts(),
$specification->getFetchExceptionHandler()
);

if (!$records instanceof ProviderRecords) {
$records = $this->createProviderRecords($records, $specification->getResource());
Expand Down Expand Up @@ -104,24 +87,24 @@ public function importOne(ImportSpecification $specification)
return $one;
}

private function fetch(ProviderResource $resource, CacheAdvice $cacheAdvice = null)
private function fetch(ProviderResource $resource, CacheAdvice $cacheAdvice, $fetchAttempts, $fetchExceptionHandler)
{
$provider = $this->getProvider($resource->getProviderClassName(), $resource->getProviderTag());

$this->applyCacheAdvice($provider, $cacheAdvice ?: $this->defaultCacheAdvice);
$this->applyCacheAdvice($provider, $cacheAdvice);

if (($records = \ScriptFUSION\Retry\retry(
$this->getMaxFetchAttempts(),
$fetchAttempts,
function () use ($provider, $resource) {
return $provider->fetch($resource);
},
function (\Exception $exception) {
function (\Exception $exception) use ($fetchExceptionHandler) {
// Throw exception if unrecoverable.
if (!$exception instanceof RecoverableConnectorException) {
throw $exception;
}

call_user_func($this->getFetchExceptionHandler(), $exception);
$fetchExceptionHandler($exception);
}
)) instanceof \Iterator) {
return $records;
Expand Down Expand Up @@ -276,44 +259,4 @@ private function getOrCreateProviderFactory()
{
return $this->providerFactory ?: $this->providerFactory = new ProviderFactory;
}

/**
* Gets the maximum number of fetch attempts per import.
*
* @return int
*/
public function getMaxFetchAttempts()
{
return $this->maxFetchAttempts;
}

/**
* Sets the maximum number of fetch attempts per import.
*
* @param int $attempts Maximum fetch attempts.
*
* @return $this
*/
public function setMaxFetchAttempts($attempts)
{
$this->maxFetchAttempts = max(1, $attempts|0);

return $this;
}

/**
* @return callable
*/
private function getFetchExceptionHandler()
{
return $this->fetchExceptionHandler ?: $this->fetchExceptionHandler = new ExponentialBackoffExceptionHandler;
}

/**
* @param callable $fetchExceptionHandler
*/
public function setFetchExceptionHandler(callable $fetchExceptionHandler)
{
$this->fetchExceptionHandler = $fetchExceptionHandler;
}
}
73 changes: 71 additions & 2 deletions src/Specification/ImportSpecification.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
use ScriptFUSION\Porter\Cache\CacheAdvice;
use ScriptFUSION\Porter\Provider\Resource\ProviderResource;
use ScriptFUSION\Porter\Transform\Transformer;
use ScriptFUSION\Retry\ExceptionHandler\ExponentialBackoffExceptionHandler;

/**
* Specifies which resource to import and how the data should be transformed.
*/
class ImportSpecification
{
const DEFAULT_FETCH_ATTEMPTS = 5;

/**
* @var ProviderResource
*/
Expand All @@ -26,14 +29,31 @@ class ImportSpecification
private $context;

/**
* @var CacheAdvice
* @var CacheAdvice|null
*/
private $cacheAdvice;

/**
* @var CacheAdvice
*/
private $defaultCacheAdvice;

/**
* @var int
*/
private $maxFetchAttempts = self::DEFAULT_FETCH_ATTEMPTS;

/**
* @var callable
*/
private $fetchExceptionHandler;

public function __construct(ProviderResource $resource)
{
$this->resource = $resource;

$this->clearTransformers();
$this->defaultCacheAdvice = CacheAdvice::SHOULD_NOT_CACHE();
}

public function __clone()
Expand All @@ -49,6 +69,7 @@ function (Transformer $transformer) {
));

is_object($this->context) && $this->context = clone $this->context;
is_object($this->fetchExceptionHandler) && $this->fetchExceptionHandler = clone $this->fetchExceptionHandler;
}

/**
Expand Down Expand Up @@ -141,7 +162,7 @@ final public function setContext($context)
*/
final public function getCacheAdvice()
{
return $this->cacheAdvice;
return $this->cacheAdvice ?: $this->defaultCacheAdvice;
}

/**
Expand All @@ -155,4 +176,52 @@ final public function setCacheAdvice(CacheAdvice $cacheAdvice)

return $this;
}

/**
* Gets the maximum number of fetch attempts per import.
*
* @return int Maximum fetch attempts.
*/
final public function getMaxFetchAttempts()
{
return $this->maxFetchAttempts;
}

/**
* Sets the maximum number of fetch attempts per import.
*
* @param int $attempts Maximum fetch attempts.
*
* @return $this
*/
final public function setMaxFetchAttempts($attempts)
{
$this->maxFetchAttempts = max(1, $attempts|0);

return $this;
}

/**
* Gets the exception handler invoked each time a fetch attempt fails.
*
* @return callable Exception handler.
*/
final public function getFetchExceptionHandler()
{
return $this->fetchExceptionHandler ?: $this->fetchExceptionHandler = new ExponentialBackoffExceptionHandler;
}

/**
* Sets the exception handler invoked each time a fetch attempt fails.
*
* @param callable $fetchExceptionHandler Exception handler.
*
* @return $this
*/
final public function setFetchExceptionHandler(callable $fetchExceptionHandler)
{
$this->fetchExceptionHandler = $fetchExceptionHandler;

return $this;
}
}
14 changes: 7 additions & 7 deletions test/Integration/Porter/PorterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -257,23 +257,23 @@ public function testOneTry()
$this->provider->shouldReceive('fetch')->once()->andThrow(RecoverableConnectorException::class);

$this->setExpectedException(FailingTooHardException::class, '1');
$this->porter->setMaxFetchAttempts(1)->import($this->specification);
$this->porter->import($this->specification->setMaxFetchAttempts(1));
}

public function testDerivedRecoverableException()
{
$this->provider->shouldReceive('fetch')->once()->andThrow(\Mockery::mock(RecoverableConnectorException::class));

$this->setExpectedException(FailingTooHardException::class);
$this->porter->setMaxFetchAttempts(1)->import($this->specification);
$this->porter->import($this->specification->setMaxFetchAttempts(1));
}

public function testDefaultTries()
{
$this->provider->shouldReceive('fetch')->times(Porter::DEFAULT_FETCH_ATTEMPTS)
$this->provider->shouldReceive('fetch')->times(ImportSpecification::DEFAULT_FETCH_ATTEMPTS)
->andThrow(RecoverableConnectorException::class);

$this->setExpectedException(FailingTooHardException::class, (string)Porter::DEFAULT_FETCH_ATTEMPTS);
$this->setExpectedException(FailingTooHardException::class, (string)ImportSpecification::DEFAULT_FETCH_ATTEMPTS);
$this->porter->import($this->specification);
}

Expand All @@ -287,13 +287,13 @@ public function testUnrecoverableException()

public function testCustomFetchExceptionHandler()
{
$this->porter->setFetchExceptionHandler(
$this->specification->setFetchExceptionHandler(
\Mockery::mock(ExponentialBackoffExceptionHandler::class)
->shouldReceive('__invoke')
->times(Porter::DEFAULT_FETCH_ATTEMPTS - 1)
->times(ImportSpecification::DEFAULT_FETCH_ATTEMPTS - 1)
->getMock()
);
$this->provider->shouldReceive('fetch')->times(Porter::DEFAULT_FETCH_ATTEMPTS)
$this->provider->shouldReceive('fetch')->times(ImportSpecification::DEFAULT_FETCH_ATTEMPTS)
->andThrow(RecoverableConnectorException::class);

$this->setExpectedException(FailingTooHardException::class);
Expand Down